diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index e5ad94a42..41ccf0373 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -1131,7 +1131,10 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): gse_op_params = { "meta": {"namespace": constants.GSE_NAMESPACE, "name": meta_name}, "op_type": op_type, + # GSE 1.0 目标对象 "hosts": [{"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id}], + # GSE 2.0 目标对象 + "agent_id_list": [host.bk_agent_id], # 此字段是节点管理自用,仅用于标识,不会被GSE使用 "nodeman_spec": { "process_status_id": process_status.id, @@ -1227,7 +1230,10 @@ def _schedule(self, data, parent_data, callback_data=None): subscription_instance = group_id_instance_map.get(process_status.group_id) proc_name = self.get_plugin_meta_name(plugin, process_status) - gse_proc_key = f"{host.bk_cloud_id}:{host.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}" + if host.bk_agent_id: + gse_proc_key = f"{host.bk_agent_id}:{constants.GSE_NAMESPACE}:{proc_name}" + else: + gse_proc_key = f"{host.bk_cloud_id}:{host.inner_ip}:{constants.GSE_NAMESPACE}:{proc_name}" proc_operate_result = result["data"].get(gse_proc_key) if not proc_operate_result: self.move_insts_to_failed( diff --git a/apps/backend/subscription/commons.py b/apps/backend/subscription/commons.py index 93b289b7b..deac64f77 100644 --- a/apps/backend/subscription/commons.py +++ b/apps/backend/subscription/commons.py @@ -38,7 +38,7 @@ def get_host_object_attribute(bk_biz_id): def list_biz_hosts(bk_biz_id, condition, func, split_params=False): biz_custom_property = [] kwargs = { - "fields": constants.LIST_BIZ_HOSTS_KWARGS, + "fields": constants.CC_HOST_FIELDS, } if bk_biz_id: biz_custom_property = get_host_object_attribute(bk_biz_id) diff --git a/apps/backend/subscription/render_functions.py b/apps/backend/subscription/render_functions.py index aaa1a7551..30d30dca4 100644 --- a/apps/backend/subscription/render_functions.py +++ b/apps/backend/subscription/render_functions.py @@ -34,7 +34,7 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: if not template_info_list: return [] - fields = constants.FIND_HOST_BY_TEMPLATE_FIELD + fields = constants.CC_HOST_FIELDS if bk_obj_id == models.Subscription.NodeType.SERVICE_TEMPLATE: # 服务模板 diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index 0eb4b7b0b..05ae8b401 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -377,7 +377,7 @@ def get_host_detail_by_template(bk_obj_id, template_info_list: list, bk_biz_id: if not template_info_list: return [] - fields = constants.FIND_HOST_BY_TEMPLATE_FIELD + fields = constants.CC_HOST_FIELDS if bk_obj_id == models.Subscription.NodeType.SERVICE_TEMPLATE: # 服务模板 diff --git a/apps/mock_data/api_mkd/gse/unit.py b/apps/mock_data/api_mkd/gse/unit.py index 87a1e6015..e7f434f50 100644 --- a/apps/mock_data/api_mkd/gse/unit.py +++ b/apps/mock_data/api_mkd/gse/unit.py @@ -8,6 +8,142 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import copy + +from apps.mock_data.common_unit import host +from apps.node_man import constants # 操作类接口一般返回的是 task_id OP_RESULT = {"task_id": "GSETASK:S:202111161138323563236795:143"} +GSE_PROCESS_VERSION = "1.7.17" +GSE_PROCESS_NAME = "test_process" + +GET_AGENT_ALIVE_STATUS_DATA = { + f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}": { + "ip": host.DEFAULT_IP, + "bk_cloud_id": constants.DEFAULT_CLOUD, + "bk_agent_alive": constants.BkAgentStatus.ALIVE.value, + } +} + +GET_AGENT_NOT_ALIVE_STATUS_DATA = { + f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}": { + "ip": host.DEFAULT_IP, + "bk_cloud_id": constants.DEFAULT_CLOUD, + "bk_agent_alive": constants.BkAgentStatus.NOT_ALIVE.value, + } +} + +GET_AGENT_INFO_DATA = { + f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}": { + "ip": host.DEFAULT_IP, + "version": GSE_PROCESS_VERSION, + "bk_cloud_id": 0, + "parent_ip": host.DEFAULT_IP, + "parent_port": 50000, + } +} + +GET_PROC_OPERATE_RESULT = { + f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}:{constants.GSE_NAMESPACE}:sub_870_host_1_host_exp002": { + "content": '{\n "value" : [\n {\n "funcID" : "",\n' + ' "instanceID" : "",\n ' + ' "procName" : "host_exp002",\n "result" : "success",\n' + ' "setupPath" : "/usr/local/gse/external_plugins/sub_870' + '_host_1/host_exp002"\n }\n ]\n}\n', + "error_code": 0, + "error_msg": "success", + } +} + +GET_AGENT_STATE_LIST_DATA = [ + { + "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", + "bk_cloud_id": constants.DEFAULT_CLOUD, + "version": GSE_PROCESS_VERSION, + "run_mode": 0, + "status_code": constants.GseAgentStatusCode.RUNNING.value, + } +] + +GET_AGENT_NOT_ALIVE_STATE_LIST_DATA = [ + { + "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", + "bk_cloud_id": constants.DEFAULT_CLOUD, + "version": GSE_PROCESS_VERSION, + "run_mode": 0, + "status_code": constants.GseAgentStatusCode.STOPPED.value, + } +] + +GET_AGENT_INFO_LIST_DATA = [ + { + "code": 0, + "message": "OK", + "bk_agent_id": f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}", + "bk_cloud_id": constants.DEFAULT_CLOUD, + "bk_host_ip": host.DEFAULT_IP, + "bk_os_type": "linux", + "report_time": 1632830304777, + "parent_ip": host.DEFAULT_IP, + "parent_port": 0, + "version": GSE_PROCESS_VERSION, + "cpu_rate": 12.34, + "mem_rate": 56.78, + "start_time": 1632830300, + "run_mode": 0, + "status_code": constants.GseAgentStatusCode.RUNNING.value, + "status": "running", + "last_status_code": constants.GseAgentStatusCode.RUNNING.value, + "last_status": "running", + "remark": "", + } +] + +GET_PROC_STATUS_DATA = { + "proc_infos": [ + { + # 需要根据不同参数补充 hosts 或者 bk_agent_id, 详见 mock_get_proc_status + "status": constants.GseProcessStatusCode.STOPPED.value, + "version": GSE_PROCESS_VERSION, + "isauto": constants.GseProcessAutoCode.AUTO.value, + "meta": { + "name": GSE_PROCESS_NAME, + "namespace": constants.GSE_NAMESPACE, + "labels": {"proc_name": GSE_PROCESS_NAME}, + }, + } + ] +} + + +def mock_get_agent_info_list(params): + agent_info_list = copy.deepcopy(GET_AGENT_INFO_LIST_DATA) + for index, agent_info in enumerate(agent_info_list): + agent_info["bk_agent_id"] = params["agent_id_list"][index] + return agent_info_list + + +def mock_get_agent_state_list(params): + agent_state_list = copy.deepcopy(GET_AGENT_STATE_LIST_DATA) + for index, agent_state in enumerate(agent_state_list): + agent_state["bk_agent_id"] = params["agent_id_list"][index] + return agent_state_list + + +def mock_get_proc_status(params): + pro_status_data = copy.deepcopy(GET_PROC_STATUS_DATA) + hosts_param = params["hosts"][0] + for index, host_info in enumerate(params["hosts"]): + if f"{hosts_param['bk_cloud_id']}:{hosts_param['ip']}" == hosts_param["agent_id"]: + pro_status_data["proc_infos"][index]["host"] = { + "ip": hosts_param["ip"], + "bk_cloud_id": hosts_param["bk_cloud_id"], + } + else: + pro_status_data["proc_infos"][index]["bk_agent_id"] = hosts_param["agent_id"] + # 添加返回一个不存在的key来模拟额外返回的case + not_exist_proc_info = copy.deepcopy(pro_status_data["proc_infos"][0]) + not_exist_proc_info["bk_agent_id"] = "not_exist_proc_info_agent_id" + pro_status_data["proc_infos"].append(not_exist_proc_info) + return pro_status_data diff --git a/apps/mock_data/api_mkd/gse/utils.py b/apps/mock_data/api_mkd/gse/utils.py index 20ce84a22..2f3b4ae89 100644 --- a/apps/mock_data/api_mkd/gse/utils.py +++ b/apps/mock_data/api_mkd/gse/utils.py @@ -10,19 +10,50 @@ """ from ... import utils +from . import unit class GseApiMockClient(utils.BaseMockClient): + DEFAULT_OPERATE_PROC_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.OP_RESULT + ) + DEFAULT_GET_OPERATE_RESULT_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_PROC_OPERATE_RESULT + ) + DEFAULT_GET_AGENT_INFO_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_INFO_DATA + ) + DEFAULT_GET_AGENT_STATUS_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_ALIVE_STATUS_DATA + ) + GET_AGENT_NOT_ALIVE_STATUS_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATUS_DATA + ) + DEFAULT_GET_AGENT_INFO_LIST_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_info_list + ) + DEFAULT_GET_AGENT_STATE_LIST_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_agent_state_list + ) + GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=unit.GET_AGENT_NOT_ALIVE_STATE_LIST_DATA + ) + DEFAULT_GET_PROC_STATUS_RETURN = utils.MockReturn( + return_type=utils.MockReturnType.SIDE_EFFECT.value, return_obj=unit.mock_get_proc_status + ) + def __init__( self, - operate_proc_return=None, - operate_proc_multi_return=None, - get_proc_operate_result_return=None, - get_proc_status_return=None, + operate_proc_return=DEFAULT_OPERATE_PROC_RETURN, + operate_proc_multi_return=DEFAULT_OPERATE_PROC_RETURN, + get_proc_operate_result_return=DEFAULT_GET_OPERATE_RESULT_RETURN, + get_proc_status_return=DEFAULT_GET_PROC_STATUS_RETURN, sync_proc_status_return=None, update_proc_info_return=None, - get_agent_info_return=None, - get_agent_status_return=None, + get_agent_info_return=DEFAULT_GET_AGENT_INFO_RETURN, + get_agent_status_return=DEFAULT_GET_AGENT_STATUS_RETURN, + get_agent_info_list_return=DEFAULT_GET_AGENT_INFO_LIST_RETURN, + get_agent_state_list_return=DEFAULT_GET_AGENT_STATE_LIST_RETURN, ): super().__init__() self.operate_proc = self.generate_magic_mock(mock_return_obj=operate_proc_return) @@ -33,3 +64,5 @@ def __init__( self.update_proc_info = self.generate_magic_mock(mock_return_obj=update_proc_info_return) self.get_agent_info = self.generate_magic_mock(mock_return_obj=get_agent_info_return) self.get_agent_status = self.generate_magic_mock(mock_return_obj=get_agent_status_return) + self.get_agent_info_list = self.generate_magic_mock(mock_return_obj=get_agent_info_list_return) + self.get_agent_state_list = self.generate_magic_mock(mock_return_obj=get_agent_state_list_return) diff --git a/apps/mock_data/common_unit/__init__.py b/apps/mock_data/common_unit/__init__.py index 3e388431a..e648ac499 100644 --- a/apps/mock_data/common_unit/__init__.py +++ b/apps/mock_data/common_unit/__init__.py @@ -10,6 +10,6 @@ """ -from . import gse, host, job, plugin, subscription +from . import host, job, plugin, subscription -__all__ = ["host", "job", "plugin", "subscription", "gse"] +__all__ = ["host", "job", "plugin", "subscription"] diff --git a/apps/mock_data/common_unit/gse.py b/apps/mock_data/common_unit/gse.py deleted file mode 100644 index 4359129b0..000000000 --- a/apps/mock_data/common_unit/gse.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding: utf-8 -*- -""" -TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. -Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at https://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" - - -from apps.node_man import constants - -from . import host - -GET_AGENT_STATUS_DATA = { - f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}": { - "ip": host.DEFAULT_IP, - "bk_cloud_id": constants.DEFAULT_CLOUD, - "bk_agent_alive": constants.BkAgentStatus.ALIVE.value, - } -} - -GET_AGENT_INFO_DATA = { - f"{constants.DEFAULT_CLOUD}:{host.DEFAULT_IP}": { - "ip": host.DEFAULT_IP, - "version": "V0.01R060D38", - "bk_cloud_id": 0, - "parent_ip": host.DEFAULT_IP, - "parent_port": 50000, - } -} diff --git a/apps/mock_data/common_unit/host.py b/apps/mock_data/common_unit/host.py index 7fc7130df..4d980407a 100644 --- a/apps/mock_data/common_unit/host.py +++ b/apps/mock_data/common_unit/host.py @@ -8,14 +8,20 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import copy + from apps.node_man import constants, models from .. import utils +BK_AGENT_ID = "xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx-xxxx" + DEFAULT_HOST_ID = 1 DEFAULT_IP = "127.0.0.1" +DEFAULT_IPv6 = "ABCD:EF01:2345:6789:ABCD:EF01:2345:6789" + PROXY_INNER_IP = "1.1.1.1" AP_MODEL_DATA = { @@ -91,13 +97,22 @@ "os_type": constants.OsType.LINUX, "cpu_arch": constants.CpuType.x86_64, "node_type": constants.NodeType.AGENT, - "node_from": constants.NodeFrom.NODE_MAN, + "node_from": constants.NodeFrom.CMDB, "ap_id": constants.DEFAULT_AP_ID, "upstream_nodes": [], "is_manual": False, "extra_data": {"bt_speed_limit": None, "peer_exchange_switch_for_agent": 1}, } +HOST_MODEL_DATA_WITH_AGENT_ID = copy.deepcopy(HOST_MODEL_DATA) +HOST_MODEL_DATA_WITH_AGENT_ID.update( + **{ + "bk_agent_id": BK_AGENT_ID, + "inner_ipv6": DEFAULT_IPv6, + "outer_ipv6": DEFAULT_IPv6, + } +) + IDENTITY_MODEL_DATA = { "bk_host_id": DEFAULT_HOST_ID, "auth_type": constants.AuthType.PASSWORD, diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index fe0b476e5..920ba0000 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -32,6 +32,7 @@ # 此值为历史遗留,后续蓝鲸不使用此字段后可废弃 DEFAULT_SUPPLIER_ID = 0 + ######################################################################################################## # 任务超时控制 ######################################################################################################## @@ -47,7 +48,6 @@ class TimeUnit: # JOB 任务超时时间 JOB_TIMEOUT = 30 * TimeUnit.MINUTE - ######################################################################################################## # 周期任务周期 run_every,非特殊统一使用秒作为单位的 int 类型,不使用crontab格式 # 以便与削峰函数 calculate_countdown 所用的 duration 复用 @@ -78,9 +78,6 @@ class TimeUnit: # GSE命名空间 GSE_NAMESPACE = "nodeman" -CC_HOST_FIELDS = ["bk_host_id", "bk_cloud_id", "bk_host_innerip", "bk_host_outerip", "bk_os_type", "bk_os_name"] - - ######################################################################################################## # 字符串常量 ######################################################################################################## @@ -588,40 +585,29 @@ class BkappRunEnvType(Enum): } ) -LIST_BIZ_HOSTS_KWARGS = [ +CC_HOST_FIELDS = [ "bk_host_id", - "bk_os_type", - "bk_host_outerip", - "bk_host_innerip", - "bk_cpu_module", + "bk_agent_id", "bk_cloud_id", - "operator", - "bk_bak_operator", - "bk_os_bit", - "bk_os_name", - "bk_host_name", - "bk_supplier_account", -] - -FIND_HOST_BY_TEMPLATE_FIELD = ( "bk_host_innerip", - "bk_cloud_id", - "bk_host_id", - "bk_biz_id", "bk_host_outerip", + "bk_host_innerip_v6", + "bk_host_outerip_v6", "bk_host_name", - "bk_os_name", "bk_os_type", + "bk_os_name", + "bk_os_bit", + "bk_os_version", + "bk_cpu_module", "operator", "bk_bak_operator", - "bk_state_name", "bk_isp_name", + "bk_biz_id", "bk_province_name", - "bk_supplier_account", - "bk_state", - "bk_os_version", "bk_state", -) + "bk_state_name", + "bk_supplier_account", +] # 限流窗口配置,用于控制CMDB订阅触发的变更频率 # 二元组:防抖时间,防抖解除窗口大小 @@ -792,6 +778,60 @@ class GseOpType(object): } +class GseAgentStatusCode(EnhanceEnum): + NOT_FOUND = -2 + QUERY_FAILED = -1 + INITIAL = 0 + STARTING = 1 + RUNNING = 2 + LOSSY = 3 + BUSY = 4 + STOPPED = 5 + UPGRADING = 6 + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return { + cls.NOT_FOUND: _("未找到"), + cls.QUERY_FAILED: _("查询失败"), + cls.INITIAL: _("初始状态"), + cls.STARTING: _("启动中"), + cls.RUNNING: _("运行中"), + cls.LOSSY: _("有损状态"), + cls.BUSY: _("繁忙状态"), + cls.STOPPED: _("已停止"), + cls.UPGRADING: _("升级中"), + } + + +class GseProcessStatusCode(EnhanceEnum): + UNREGISTER = 0 + RUNNING = 1 + STOPPED = 2 + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return { + cls.UNREGISTER: _("未注册"), + cls.RUNNING: _("运行中"), + cls.STOPPED: _("已停止"), + } + + +class GseProcessAutoCode(EnhanceEnum): + """进程是否在GSE托管""" + + AUTO = True + NOT_AUTO = False + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return { + cls.AUTO: _("已托管"), + cls.NOT_AUTO: _("未托管"), + } + + class CmdbObjectId: BIZ = "biz" SET = "set" diff --git a/apps/node_man/migrations/0059_auto_20220315_1852.py b/apps/node_man/migrations/0059_auto_20220315_1852.py new file mode 100644 index 000000000..a198069bc --- /dev/null +++ b/apps/node_man/migrations/0059_auto_20220315_1852.py @@ -0,0 +1,35 @@ +# Generated by Django 3.2.4 on 2022-03-15 10:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("node_man", "0058_replenish_plugin_template"), + ] + + operations = [ + migrations.AddField( + model_name="host", + name="bk_agent_id", + field=models.CharField(blank=True, db_index=True, max_length=64, null=True, verbose_name="AgentID"), + ), + migrations.AlterField( + model_name="host", + name="cpu_arch", + field=models.CharField( + choices=[ + ("x86", "x86"), + ("x86_64", "x86_64"), + ("powerpc", "powerpc"), + ("aarch64", "aarch64"), + ("sparc", "sparc"), + ], + db_index=True, + default="x86_64", + max_length=16, + verbose_name="CPU架构", + ), + ), + ] diff --git a/apps/node_man/migrations/0060_auto_20220315_2044.py b/apps/node_man/migrations/0060_auto_20220315_2044.py new file mode 100644 index 000000000..3eebb69a0 --- /dev/null +++ b/apps/node_man/migrations/0060_auto_20220315_2044.py @@ -0,0 +1,43 @@ +# Generated by Django 3.2.4 on 2022-03-15 12:44 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("node_man", "0059_auto_20220315_1852"), + ] + + operations = [ + migrations.AddField( + model_name="host", + name="inner_ipv6", + field=models.CharField(blank=True, default="", max_length=45, null=True, verbose_name="内网IPv6"), + ), + migrations.AddField( + model_name="host", + name="outer_ipv6", + field=models.CharField(blank=True, default="", max_length=45, null=True, verbose_name="外网IPv6"), + ), + migrations.AlterField( + model_name="host", + name="data_ip", + field=models.CharField(blank=True, default="", max_length=15, null=True, verbose_name="数据IP"), + ), + migrations.AlterField( + model_name="host", + name="inner_ip", + field=models.CharField(db_index=True, max_length=15, verbose_name="内网IP"), + ), + migrations.AlterField( + model_name="host", + name="login_ip", + field=models.CharField(blank=True, default="", max_length=15, null=True, verbose_name="登录IP"), + ), + migrations.AlterField( + model_name="host", + name="outer_ip", + field=models.CharField(blank=True, default="", max_length=15, null=True, verbose_name="外网IP"), + ), + ] diff --git a/apps/node_man/models.py b/apps/node_man/models.py index f17f788d8..427e6c644 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -307,19 +307,23 @@ class Meta: class Host(models.Model): bk_host_id = models.IntegerField(_("CMDB主机ID"), primary_key=True) + bk_agent_id = models.CharField(_("AgentID"), max_length=64, db_index=True, blank=True, null=True) bk_biz_id = models.IntegerField(_("业务ID"), db_index=True) bk_cloud_id = models.IntegerField(_("云区域ID"), db_index=True) - inner_ip = models.CharField(_("内网IP"), max_length=45, db_index=True) - outer_ip = models.CharField(_("外网IP"), max_length=45, blank=True, null=True, default="") - login_ip = models.CharField(_("登录IP"), max_length=45, blank=True, null=True, default="") - data_ip = models.CharField(_("数据IP"), max_length=45, blank=True, null=True, default="") + inner_ip = models.CharField(_("内网IP"), max_length=15, db_index=True) + outer_ip = models.CharField(_("外网IP"), max_length=15, blank=True, null=True, default="") + login_ip = models.CharField(_("登录IP"), max_length=15, blank=True, null=True, default="") + data_ip = models.CharField(_("数据IP"), max_length=15, blank=True, null=True, default="") + + inner_ipv6 = models.CharField(_("内网IPv6"), max_length=45, blank=True, null=True, default="") + outer_ipv6 = models.CharField(_("外网IPv6"), max_length=45, blank=True, null=True, default="") os_type = models.CharField( _("操作系统"), max_length=16, choices=constants.OS_CHOICES, default=constants.OsType.LINUX, db_index=True ) cpu_arch = models.CharField( - _("操作系统"), max_length=16, choices=constants.CPU_CHOICES, default=constants.CpuType.x86_64, db_index=True + _("CPU架构"), max_length=16, choices=constants.CPU_CHOICES, default=constants.CpuType.x86_64, db_index=True ) node_type = models.CharField(_("节点类型"), max_length=16, choices=constants.NODE_CHOICES, db_index=True) node_from = models.CharField(_("节点来源"), max_length=45, choices=constants.NODE_FROM_CHOICES, default="NODE_MAN") diff --git a/apps/node_man/periodic_tasks/resource_watch_task.py b/apps/node_man/periodic_tasks/resource_watch_task.py index f8d80883f..d9b10fee6 100644 --- a/apps/node_man/periodic_tasks/resource_watch_task.py +++ b/apps/node_man/periodic_tasks/resource_watch_task.py @@ -18,7 +18,7 @@ from django.db.utils import IntegrityError from apps.component.esbclient import client_v2 -from apps.node_man import constants as const +from apps.node_man import constants from apps.node_man.models import ( AccessPoint, GlobalSettings, @@ -135,7 +135,7 @@ def sync_resource_watch_host_event(): 拉取主机事件 """ kwargs = { - "bk_resource": const.ResourceType.host, + "bk_resource": constants.ResourceType.host, "bk_fields": ["bk_host_innerip", "bk_os_type", "bk_host_id", "bk_cloud_id", "bk_host_outerip"], } @@ -147,7 +147,7 @@ def sync_resource_watch_host_relation_event(): 拉取主机关系事件 """ kwargs = { - "bk_resource": const.ResourceType.host_relation, + "bk_resource": constants.ResourceType.host_relation, } _resource_watch(RESOURCE_WATCH_HOST_RELATION_CURSOR_KEY, kwargs) @@ -157,7 +157,7 @@ def sync_resource_watch_process_event(): """ 拉取进程 """ - kwargs = {"bk_resource": const.ResourceType.process} + kwargs = {"bk_resource": constants.ResourceType.process} _resource_watch(RESOURCE_WATCH_PROCESS_CURSOR_KEY, kwargs) @@ -170,7 +170,7 @@ def delete_host(bk_host_id): def list_biz_host(bk_biz_id, bk_host_id): kwargs = { "bk_biz_id": bk_biz_id, - "fields": ["bk_host_id", "bk_cloud_id", "bk_host_innerip", "bk_host_outerip", "bk_os_type", "bk_os_name"], + "fields": constants.CC_HOST_FIELDS, "host_property_filter": { "condition": "AND", "rules": [{"field": "bk_host_id", "operator": "equal", "value": bk_host_id}], @@ -199,11 +199,11 @@ def apply_resource_watched_events(): event_bk_biz_id = event.bk_detail.get("bk_biz_id") try: - if event.bk_event_type in ["update", "create"] and event.bk_resource == const.ResourceType.host: + if event.bk_event_type in ["update", "create"] and event.bk_resource == constants.ResourceType.host: _, need_delete_host_ids = update_or_create_host_base(None, None, [event.bk_detail]) if need_delete_host_ids: delete_host(need_delete_host_ids[0]) - elif event.bk_event_type in ["create"] and event.bk_resource == const.ResourceType.host_relation: + elif event.bk_event_type in ["create"] and event.bk_resource == constants.ResourceType.host_relation: # 查询主机信息创建或者更新 bk_host_id = event.bk_detail["bk_host_id"] host_obj = Host.objects.filter(bk_host_id=bk_host_id).first() @@ -216,10 +216,18 @@ def apply_resource_watched_events(): host = list_biz_host(event_bk_biz_id, bk_host_id) if host: ap_id = ( - const.DEFAULT_AP_ID if AccessPoint.objects.count() > 1 else AccessPoint.objects.first().id + constants.DEFAULT_AP_ID + if AccessPoint.objects.count() > 1 + else AccessPoint.objects.first().id ) host_data, identify_data, process_status_data = _generate_host( - event_bk_biz_id, host, host["bk_host_innerip"], host["bk_host_outerip"], ap_id + event_bk_biz_id, + host, + host["bk_host_innerip"], + host["bk_host_outerip"], + host.get("bk_host_innerip_v6"), + host.get("bk_host_outerip_v6"), + ap_id, ) # 与注册CC原子存在同时写入的可能,防止更新进行强制插入 try: @@ -273,7 +281,7 @@ def cal_next_debounce_window(current_time): """ 计算出下一次的防抖窗口 """ - windows = const.CMDB_SUBSCRIPTION_DEBOUNCE_WINDOWS + windows = constants.CMDB_SUBSCRIPTION_DEBOUNCE_WINDOWS if current_time < windows[0][0]: # 防抖时间下限 diff --git a/apps/node_man/periodic_tasks/sync_agent_status_task.py b/apps/node_man/periodic_tasks/sync_agent_status_task.py index c553a38f3..1aa2645ed 100644 --- a/apps/node_man/periodic_tasks/sync_agent_status_task.py +++ b/apps/node_man/periodic_tasks/sync_agent_status_task.py @@ -9,17 +9,24 @@ specific language governing permissions and limitations under the License. """ from celery.task import periodic_task, task +from django.conf import settings +from django.db.models import Q -from apps.component.esbclient import client_v2 from apps.node_man import constants from apps.node_man.models import Host, ProcessStatus from apps.utils.periodic_task import calculate_countdown +from common.api import GseApi from common.log import logger @task(queue="default", ignore_result=True) -def update_or_create_host_agent_status(task_id, start, end): - hosts = Host.objects.values("bk_host_id", "bk_cloud_id", "inner_ip", "node_from")[start:end] +def update_or_create_host_agent_status(task_id, start, end, has_agent_id=False): + # GSE接口不支持同时传 {cloud_id}:{ip} + {agent_id} 混合模式,因此需要分开处理 + if has_agent_id: + host_queryset = Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="") + else: + host_queryset = Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact="")) + hosts = host_queryset.values("bk_host_id", "bk_agent_id", "bk_cloud_id", "inner_ip", "node_from")[start:end] if not hosts: # 结束递归 return @@ -31,22 +38,41 @@ def update_or_create_host_agent_status(task_id, start, end): node_from_map = {} # 生成查询参数host弄表 + agent_id_list = [] query_host = [] for host in hosts: - # IP 为空,直接跳过 - if not host["inner_ip"]: - continue - bk_host_id_map[f"{host['bk_cloud_id']}:{host['inner_ip']}"] = host["bk_host_id"] - node_from_map[f"{host['bk_cloud_id']}:{host['inner_ip']}"] = host["node_from"] + if host["bk_agent_id"] and settings.GSE_VERSION != "V1": + agent_id = host["bk_agent_id"] + else: + agent_id = f"{host['bk_cloud_id']}:{host['inner_ip']}" + bk_host_id_map[agent_id] = host["bk_host_id"] + node_from_map[agent_id] = host["node_from"] query_host.append({"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"]}) + agent_id_list.append(agent_id) # 查询agent状态和版本信息 - agent_status_data = client_v2.gse.get_agent_status({"hosts": query_host}) - agent_info_data = client_v2.gse.get_agent_info({"hosts": query_host}) + if settings.GSE_VERSION == "V1": + agent_status_data = GseApi.get_agent_status({"hosts": query_host}) + agent_info_data = GseApi.get_agent_info({"hosts": query_host}) + else: + # GSE2.0 接口,补充GSE1.0字段 + agent_status_list = GseApi.get_agent_state_list({"agent_id_list": agent_id_list}) + agent_status_data = {} + for agent_status in agent_status_list: + if agent_status.get("status_code") == constants.GseAgentStatusCode.RUNNING.value: + agent_status["bk_agent_alive"] = constants.BkAgentStatus.ALIVE.value + else: + agent_status["bk_agent_alive"] = constants.BkAgentStatus.NOT_ALIVE.value + agent_status_data[agent_status["bk_agent_id"]] = agent_status + + agent_info_list = GseApi.get_agent_info_list({"agent_id_list": agent_id_list}) + agent_info_data = {agent_info["bk_agent_id"]: agent_info for agent_info in agent_info_list} # 查询需要更新主机的ProcessStatus对象 process_status_objs = ProcessStatus.objects.filter( - name="gseagent", bk_host_id__in=bk_host_id_map.values(), source_type=ProcessStatus.SourceType.DEFAULT + name=ProcessStatus.GSE_AGENT_PROCESS_NAME, + bk_host_id__in=bk_host_id_map.values(), + source_type=ProcessStatus.SourceType.DEFAULT, ).values("bk_host_id", "id", "status") # 生成bk_host_id与ProcessStatus对象的映射 @@ -63,31 +89,31 @@ def update_or_create_host_agent_status(task_id, start, end): is_running = host_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value version = constants.VERSION_PATTERN.search(agent_info_data[key]["version"]) + if is_running: + status = constants.ProcStateType.RUNNING + if node_from_map[key] == constants.NodeFrom.CMDB: + need_update_node_from_host.append( + Host(bk_host_id=bk_host_id_map[key], node_from=constants.NodeFrom.NODE_MAN) + ) + else: + # 状态为0时如果节点管理为CMDB标记为未安装否则为异常 + if node_from_map[key] == constants.NodeFrom.CMDB: + # NOT_INSTALLED + status = constants.ProcStateType.NOT_INSTALLED + else: + # TERMINATED + status = constants.ProcStateType.TERMINATED + if not process_status_id: # 如果不存在ProcessStatus对象需要创建 to_be_created_status.append( ProcessStatus( bk_host_id=bk_host_id_map[key], - status=constants.PROC_STATUS_DICT[host_info["bk_agent_alive"]], + status=status, version=version.group() if version else "", ) ) else: - if is_running: - status = constants.PROC_STATUS_DICT[host_info["bk_agent_alive"]] - if node_from_map[key] == constants.NodeFrom.CMDB: - need_update_node_from_host.append( - Host(bk_host_id=bk_host_id_map[key], node_from=constants.NodeFrom.NODE_MAN) - ) - else: - # 状态为0时如果节点管理为CMDB标记为未安装否则为异常 - if node_from_map[key] == constants.NodeFrom.CMDB: - # NOT_INSTALLED - status = constants.ProcStateType.NOT_INSTALLED - else: - # TERMINATED - status = constants.ProcStateType.TERMINATED - process_objs.append( ProcessStatus( id=process_status_id, status=status, version=version.group() if (version and is_running) else "" @@ -113,14 +139,18 @@ def sync_agent_status_periodic_task(): """ task_id = sync_agent_status_periodic_task.request.id logger.info(f"{task_id} | sync_agent_status_task: Start syncing host status.") - count = Host.objects.count() - for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS): - countdown = calculate_countdown( - count=count / constants.QUERY_AGENT_STATUS_HOST_LENS, - index=start / constants.QUERY_AGENT_STATUS_HOST_LENS, - duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL, - ) - logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds") - update_or_create_host_agent_status.apply_async( - (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS), countdown=countdown - ) + for (has_agent_id, queryset) in [ + (True, Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact="")), + (False, Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact=""))), + ]: + count = queryset.count() + for start in range(0, count, constants.QUERY_AGENT_STATUS_HOST_LENS): + countdown = calculate_countdown( + count=count / constants.QUERY_AGENT_STATUS_HOST_LENS, + index=start / constants.QUERY_AGENT_STATUS_HOST_LENS, + duration=constants.SYNC_AGENT_STATUS_TASK_INTERVAL, + ) + logger.info(f"{task_id} | sync_agent_status_task after {countdown} seconds") + update_or_create_host_agent_status.apply_async( + (task_id, start, start + constants.QUERY_AGENT_STATUS_HOST_LENS, has_agent_id), countdown=countdown + ) diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py index 71e436c36..59a17cf7e 100644 --- a/apps/node_man/periodic_tasks/sync_cmdb_host.py +++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py @@ -16,8 +16,7 @@ from apps.component.esbclient import client_v2 from apps.exceptions import ComponentCallError -from apps.node_man import constants as const -from apps.node_man import models, tools +from apps.node_man import constants, models, tools from common.log import logger @@ -44,8 +43,8 @@ def _list_biz_hosts(biz_id: int, start: int) -> dict: biz_hosts = client_v2.cc.list_biz_hosts( { "bk_biz_id": biz_id, - "fields": const.CC_HOST_FIELDS, - "page": {"start": start, "limit": const.QUERY_CMDB_LIMIT, "sort": "bk_host_id"}, + "fields": constants.CC_HOST_FIELDS, + "page": {"start": start, "limit": constants.QUERY_CMDB_LIMIT, "sort": "bk_host_id"}, } ) # 去除内网IP为空的主机 @@ -57,8 +56,8 @@ def _list_resource_pool_hosts(start): try: result = client_v2.cc.list_resource_pool_hosts( { - "page": {"start": start, "limit": const.QUERY_CMDB_LIMIT, "sort": "bk_host_id"}, - "fields": const.CC_HOST_FIELDS, + "page": {"start": start, "limit": constants.QUERY_CMDB_LIMIT, "sort": "bk_host_id"}, + "fields": constants.CC_HOST_FIELDS, } ) return result @@ -66,44 +65,56 @@ def _list_resource_pool_hosts(start): return {"info": []} -def _bulk_update_host(hosts, fields): +def _bulk_update_host(hosts, extra_fields): + update_fields = ["bk_cloud_id", "inner_ip", "outer_ip", "inner_ipv6", "outer_ipv6", "bk_agent_id"] + extra_fields if hosts: - models.Host.objects.bulk_update(hosts, fields=fields) + models.Host.objects.bulk_update(hosts, fields=update_fields) -def _generate_host(biz_id, host, bk_host_innerip, bk_host_outerip, ap_id): +def _generate_host(biz_id, host, bk_host_innerip, bk_host_outerip, bk_host_innerip_v6, bk_host_outerip_v6, ap_id): os_type = tools.HostV2Tools.get_os_type(host) cpu_arch = tools.HostV2Tools.get_cpu_arch(host) host_data = models.Host( bk_host_id=host["bk_host_id"], + bk_agent_id=host.get("bk_agent_id"), bk_biz_id=biz_id, bk_cloud_id=host["bk_cloud_id"], inner_ip=bk_host_innerip, outer_ip=bk_host_outerip, - node_from=const.NodeFrom.CMDB, + inner_ipv6=bk_host_innerip_v6, + outer_ipv6=bk_host_outerip_v6, + node_from=constants.NodeFrom.CMDB, os_type=os_type, cpu_arch=cpu_arch, - node_type=const.NodeType.AGENT if host["bk_cloud_id"] == const.DEFAULT_CLOUD else const.NodeType.PAGENT, + node_type=constants.NodeType.AGENT + if host["bk_cloud_id"] == constants.DEFAULT_CLOUD + else constants.NodeType.PAGENT, ap_id=ap_id, ) identify_data = models.IdentityData( bk_host_id=host["bk_host_id"], auth_type="PASSWORD", - account=const.WINDOWS_ACCOUNT if os_type == const.OsType.WINDOWS else const.LINUX_ACCOUNT, - port=const.WINDOWS_PORT if os_type == const.OsType.WINDOWS else settings.BKAPP_DEFAULT_SSH_PORT, + account=constants.WINDOWS_ACCOUNT if os_type == constants.OsType.WINDOWS else constants.LINUX_ACCOUNT, + port=constants.WINDOWS_PORT if os_type == constants.OsType.WINDOWS else settings.BKAPP_DEFAULT_SSH_PORT, ) - process_status_data = models.ProcessStatus(bk_host_id=host["bk_host_id"], name="gseagent") + process_status_data = models.ProcessStatus( + bk_host_id=host["bk_host_id"], name=models.ProcessStatus.GSE_AGENT_PROCESS_NAME + ) return host_data, identify_data, process_status_data def find_host_biz_relations(find_host_biz_ids): host_biz_relation = {} - for count in range(math.ceil(len(find_host_biz_ids) / const.QUERY_CMDB_LIMIT)): + for count in range(math.ceil(len(find_host_biz_ids) / constants.QUERY_CMDB_LIMIT)): cc_host_biz_relations = client_v2.cc.find_host_biz_relations( - {"bk_host_id": find_host_biz_ids[count * const.QUERY_CMDB_LIMIT : (count + 1) * const.QUERY_CMDB_LIMIT]} + { + "bk_host_id": find_host_biz_ids[ + count * constants.QUERY_CMDB_LIMIT : (count + 1) * constants.QUERY_CMDB_LIMIT + ] + } ) for _host_biz in cc_host_biz_relations: host_biz_relation[_host_biz["bk_host_id"]] = _host_biz["bk_biz_id"] @@ -115,12 +126,12 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): bk_host_ids = [_host["bk_host_id"] for _host in cmdb_host_data] # 查询节点管理已存在的主机 - exist_proxy_host_ids = models.Host.objects.filter(bk_host_id__in=bk_host_ids, node_type="PROXY").values_list( - "bk_host_id", flat=True - ) + exist_proxy_host_ids = models.Host.objects.filter( + bk_host_id__in=bk_host_ids, node_type=constants.NodeType.PROXY + ).values_list("bk_host_id", flat=True) exist_agent_host_ids = ( models.Host.objects.filter(bk_host_id__in=bk_host_ids) - .exclude(node_type="PROXY") + .exclude(node_type=constants.NodeType.PROXY) .values_list("bk_host_id", flat=True) ) @@ -134,7 +145,7 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): need_create_host_without_biz = [] need_delete_host_ids = [] - ap_id = const.DEFAULT_AP_ID if models.AccessPoint.objects.count() > 1 else models.AccessPoint.objects.first().id + ap_id = constants.DEFAULT_AP_ID if models.AccessPoint.objects.count() > 1 else models.AccessPoint.objects.first().id # 已存在的主机批量更新,不存在的主机批量创建 for host in cmdb_host_data: @@ -157,12 +168,17 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): bk_host_innerip = host["bk_host_innerip"] bk_host_outerip = host["bk_host_outerip"] + bk_host_innerip_v6 = host.get("bk_host_innerip_v6") + bk_host_outerip_v6 = host.get("bk_host_outerip_v6") if host["bk_host_id"] in exist_agent_host_ids: host_params = { "bk_host_id": host["bk_host_id"], "bk_cloud_id": host["bk_cloud_id"], + "bk_agent_id": host.get("bk_agent_id"), "inner_ip": bk_host_innerip, "outer_ip": bk_host_outerip, + "inner_ipv6": bk_host_innerip_v6, + "outer_ipv6": bk_host_outerip_v6, } os_type = tools.HostV2Tools.get_os_type(host) @@ -182,10 +198,13 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): elif host["bk_host_id"] in exist_proxy_host_ids: host_params = { "bk_host_id": host["bk_host_id"], + "bk_agent_id": host.get("bk_agent_id"), "bk_cloud_id": host["bk_cloud_id"], "inner_ip": bk_host_innerip, "outer_ip": bk_host_outerip, - "os_type": const.OsType.LINUX, + "inner_ipv6": bk_host_innerip_v6, + "outer_ipv6": bk_host_outerip_v6, + "os_type": constants.OsType.LINUX, } if biz_id: host_params["bk_biz_id"] = biz_id @@ -198,7 +217,7 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): need_create_host_without_biz.append(host) else: host_data, identify_data, process_status_data = _generate_host( - biz_id, host, bk_host_innerip, bk_host_outerip, ap_id + biz_id, host, bk_host_innerip, bk_host_outerip, bk_host_innerip_v6, bk_host_outerip_v6, ap_id ) need_create_hosts.append(host_data) host_identity_objs.append(identify_data) @@ -218,16 +237,18 @@ def update_or_create_host_base(biz_id, task_id, cmdb_host_data): need_create_host, need_create_host["bk_host_innerip"].split(",")[0], need_create_host["bk_host_outerip"].split(",")[0], + need_create_host.get("bk_host_innerip_v6"), + need_create_host.get("bk_host_outerip_v6"), ap_id, ) need_create_hosts.append(host_data) host_identity_objs.append(identify_data) process_status_objs.append(process_status_data) - _bulk_update_host(need_update_hosts, ["bk_biz_id", "bk_cloud_id", "inner_ip", "outer_ip", "os_type"]) - _bulk_update_host(need_update_hosts_without_biz, ["bk_cloud_id", "inner_ip", "outer_ip", "os_type"]) - _bulk_update_host(need_update_hosts_without_os, ["bk_biz_id", "bk_cloud_id", "inner_ip", "outer_ip"]) - _bulk_update_host(need_update_hosts_without_biz_os, ["bk_cloud_id", "inner_ip", "outer_ip"]) + _bulk_update_host(need_update_hosts, ["bk_biz_id", "os_type"]) + _bulk_update_host(need_update_hosts_without_biz, ["os_type"]) + _bulk_update_host(need_update_hosts_without_os, ["bk_biz_id"]) + _bulk_update_host(need_update_hosts_without_biz_os, []) if need_create_hosts: models.Host.objects.bulk_create(need_create_hosts) @@ -248,14 +269,14 @@ def _update_or_create_host(biz_id, start=0, task_id=None): logger.info( f"{task_id} | sync_cmdb_host biz:[{biz_id}] " - f"host count: [{host_count}] current sync[{start}-{start + const.QUERY_CMDB_LIMIT}]" + f"host count: [{host_count}] current sync[{start}-{start + constants.QUERY_CMDB_LIMIT}]" ) bk_host_ids, _ = update_or_create_host_base(biz_id, task_id, host_data) # 递归 - if host_count > start + const.QUERY_CMDB_LIMIT: - bk_host_ids += _update_or_create_host(biz_id, start + const.QUERY_CMDB_LIMIT, task_id=task_id) + if host_count > start + constants.QUERY_CMDB_LIMIT: + bk_host_ids += _update_or_create_host(biz_id, start + constants.QUERY_CMDB_LIMIT, task_id=task_id) return bk_host_ids diff --git a/apps/node_man/periodic_tasks/sync_proc_status_task.py b/apps/node_man/periodic_tasks/sync_proc_status_task.py index 60d8c0eba..247b93caf 100644 --- a/apps/node_man/periodic_tasks/sync_proc_status_task.py +++ b/apps/node_man/periodic_tasks/sync_proc_status_task.py @@ -8,15 +8,15 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import time from celery.task import periodic_task, task from django.conf import settings +from django.db.models import Q -from apps.component.esbclient import client_v2 from apps.node_man import constants from apps.node_man.models import Host, ProcessStatus from apps.utils.periodic_task import calculate_countdown +from common.api import GseApi from common.log import logger @@ -35,10 +35,11 @@ def query_proc_status(proc_name, host_list): 5000: 2s-4s (2.3s 2.2s 4.1s 4.3s) """ kwargs = { - "meta": {"namespace": "nodeman", "name": proc_name, "labels": {"proc_name": proc_name}}, + "meta": {"namespace": constants.GSE_NAMESPACE, "name": proc_name, "labels": {"proc_name": proc_name}}, "hosts": host_list, + "agent_id_list": [host["agent_id"] for host in host_list], } - data = client_v2.gse.get_proc_status(kwargs) + data = GseApi.get_proc_status(kwargs) return data.get("proc_infos") or [] @@ -48,17 +49,23 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): host_ip_cloud_list = [] bk_host_id_map = {} for host in hosts: - host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id}) - bk_host_id_map[f"{host.inner_ip}:{host.bk_cloud_id}"] = host.bk_host_id + if host.bk_agent_id: + agent_id = host.bk_agent_id + else: + agent_id = f"{host.bk_cloud_id}:{host.inner_ip}" + host_ip_cloud_list.append({"ip": host.inner_ip, "bk_cloud_id": host.bk_cloud_id, "agent_id": agent_id}) + bk_host_id_map[agent_id] = host.bk_host_id for proc_name in sync_proc_list: - past = time.time() proc_infos = query_proc_status(proc_name, host_ip_cloud_list) - logger.info(f"this get_proc_status cost {time.time() - past}s") host_proc_status_map = {} for info in proc_infos: - host_proc_status_map[f'{info["host"]["ip"]}:{info["host"]["bk_cloud_id"]}'] = { + if "bk_agent_id" in info: + key = info["bk_agent_id"] + else: + key = f'{info["host"]["bk_cloud_id"]}:{info["host"]["ip"]}' + host_proc_status_map[key] = { "version": get_version(info["version"]), "status": constants.PLUGIN_STATUS_DICT[info["status"]], "is_auto": constants.AutoStateType.AUTO if info["isauto"] else constants.AutoStateType.UNAUTO, @@ -80,10 +87,10 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): need_update_status = [] need_create_status = [] - for host_cloud_key, host_proc_info in host_proc_status_map.items(): - if host_cloud_key not in bk_host_id_map: + for agent_id, host_proc_info in host_proc_status_map.items(): + if agent_id not in bk_host_id_map: continue - db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[host_cloud_key]}') + db_proc_info = host_proc_key__proc_map.get(f'{host_proc_info["name"]}:{bk_host_id_map[agent_id]}') # 如果DB中进程状态为手动停止,并且同步回来的进程状态为终止,此时保持手动停止的标记,用于订阅的豁免操作 if ( @@ -111,16 +118,16 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): name=host_proc_info["name"], source_type=ProcessStatus.SourceType.DEFAULT, proc_type=constants.ProcType.PLUGIN, - bk_host_id=bk_host_id_map[host_cloud_key], + bk_host_id=bk_host_id_map[agent_id], is_latest=True, ) # 忽略无用的进程信息 - if obj.status != "UNREGISTER": + if obj.status != constants.ProcStateType.UNREGISTER: need_create_status.append(obj) ProcessStatus.objects.bulk_update(need_update_status, fields=["status", "version", "is_auto"]) ProcessStatus.objects.bulk_create(need_create_status) - logger.info(f"{task_id} | Sync process status start flag: {start} complate") + logger.info(f"{task_id} | Sync process status start flag: {start} complete") @periodic_task( @@ -131,22 +138,26 @@ def update_or_create_proc_status(task_id, hosts, sync_proc_list, start): def sync_proc_status_periodic_task(): sync_proc_list = settings.HEAD_PLUGINS task_id = sync_proc_status_periodic_task.request.id - hosts = Host.objects.all() - count = hosts.count() - logger.info(f"{task_id} | sync host proc status... host_count={count}.") - - for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS): - countdown = calculate_countdown( - count=count / constants.QUERY_PROC_STATUS_HOST_LENS, - index=start / constants.QUERY_PROC_STATUS_HOST_LENS, - duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL, - ) - logger.info(f"{task_id} | sync host proc status after {countdown} seconds") - - # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start) - update_or_create_proc_status.apply_async( - (task_id, hosts[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start), - countdown=countdown, - ) - - logger.info(f"{task_id} | sync host proc status complate.") + host_queryset_list = [ + Host.objects.exclude(bk_agent_id__isnull=True).exclude(bk_agent_id__exact=""), + Host.objects.filter(Q(bk_agent_id__isnull=True) | Q(bk_agent_id__exact="")), + ] + for host_queryset in host_queryset_list: + count = host_queryset.count() + logger.info(f"{task_id} | sync host proc status... host_count={count}.") + + for start in range(0, count, constants.QUERY_PROC_STATUS_HOST_LENS): + countdown = calculate_countdown( + count=count / constants.QUERY_PROC_STATUS_HOST_LENS, + index=start / constants.QUERY_PROC_STATUS_HOST_LENS, + duration=constants.SYNC_PROC_STATUS_TASK_INTERVAL, + ) + logger.info(f"{task_id} | sync host proc status after {countdown} seconds") + + # (task_id, hosts[start: start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start) + update_or_create_proc_status.apply_async( + (task_id, host_queryset[start : start + constants.QUERY_PROC_STATUS_HOST_LENS], sync_proc_list, start), + countdown=countdown, + ) + + logger.info(f"{task_id} | sync host proc status complete.") diff --git a/apps/node_man/tests/test_pericdic_tasks/mock_data.py b/apps/node_man/tests/test_pericdic_tasks/mock_data.py index f3afbfa8a..dc99cb591 100644 --- a/apps/node_man/tests/test_pericdic_tasks/mock_data.py +++ b/apps/node_man/tests/test_pericdic_tasks/mock_data.py @@ -21,6 +21,7 @@ MOCK_BK_CLOUD_ID = 99999 MOCK_IP = "127.0.0.1" MOCK_HOST_ID = 233333 +MOCK_AGENT_ID = "35103d95b447465a944384968d6222cc35103d95b447465a944384968d6222cc" # 主机数量(用于批量构造) MOCK_HOST_NUM = 10 # 其他各种ID diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py index 4bb9b4e8c..939b01850 100644 --- a/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py +++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_agent_status_task.py @@ -8,38 +8,80 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - from unittest.mock import patch +from django.test import override_settings + +from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_VERSION +from apps.mock_data.api_mkd.gse.utils import GseApiMockClient +from apps.mock_data.common_unit.host import ( + HOST_MODEL_DATA, + HOST_MODEL_DATA_WITH_AGENT_ID, +) from apps.node_man import constants from apps.node_man.models import Host, ProcessStatus from apps.node_man.periodic_tasks.sync_agent_status_task import ( + sync_agent_status_periodic_task, update_or_create_host_agent_status, ) from apps.utils.unittest.testcase import CustomBaseTestCase -from .mock_data import MOCK_GET_AGENT_STATUS, MOCK_HOST -from .utils import MockClient, modify_constant_data - class TestSyncAgentStatus(CustomBaseTestCase): - @patch("apps.node_man.periodic_tasks.sync_agent_status_task.client_v2", MockClient) - def test_update_or_create_host_agent_status(self): - host, _ = Host.objects.get_or_create(**MOCK_HOST) + def test_sync_agent_status_periodic_task(self): + Host.objects.create(**HOST_MODEL_DATA) + sync_agent_status_periodic_task() + + def test_update_or_create_host_agent_status_no_host(self): + update_or_create_host_agent_status(None, 0, 1) + self.assertEqual(ProcessStatus.objects.count(), 0) + + @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient()) + def test_update_or_create_host_agent_status_alive(self): + host = Host.objects.create(**HOST_MODEL_DATA) + # 测试创建ProcessStatus对象 + update_or_create_host_agent_status(None, 0, 1) + process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(process_status.status, constants.ProcStateType.RUNNING) + self.assertEqual(process_status.version, GSE_PROCESS_VERSION) + host = Host.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(host.node_from, constants.NodeFrom.NODE_MAN) + @patch( + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", + GseApiMockClient(get_agent_status_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATUS_RETURN), + ) + def test_update_or_create_host_agent_status_not_alive(self): + host = Host.objects.create(**HOST_MODEL_DATA) # 测试创建ProcessStatus对象 update_or_create_host_agent_status(None, 0, 1) - self.assertEqual(ProcessStatus.objects.count(), 1) + process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(process_status.status, constants.ProcStateType.NOT_INSTALLED) + + # Agent异常且已在节点管理录入过的,标记状态为异常 + host.node_from = constants.NodeFrom.NODE_MAN + host.save() + update_or_create_host_agent_status(None, 0, 1) + process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(process_status.status, constants.ProcStateType.TERMINATED) - # agent状态为alive时 更新主机信息node_from为NODE_MAN + @override_settings(GSE_VERSION="V2") + @patch("apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", GseApiMockClient()) + def test_update_or_create_host_agent_status_alive_gse_v2(self): + host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID) + # 测试创建ProcessStatus对象 + update_or_create_host_agent_status(None, 0, 1, has_agent_id=True) + process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(process_status.status, constants.ProcStateType.RUNNING) + + @override_settings(GSE_VERSION="V2") + @patch( + "apps.node_man.periodic_tasks.sync_agent_status_task.GseApi", + GseApiMockClient(get_agent_state_list_return=GseApiMockClient.GET_AGENT_NOT_ALIVE_STATE_LIST_RETURN), + ) + def test_update_or_create_host_agent_status_not_alive_gse_v2(self): + host = Host.objects.create(**HOST_MODEL_DATA) + # 测试创建ProcessStatus对象 update_or_create_host_agent_status(None, 0, 1) - self.assertEqual(Host.objects.get(bk_host_id=host.bk_host_id).node_from, constants.NodeFrom.NODE_MAN) - - # agent状态信息为not alive时 更新进程信息为NOT_INSTALLED/TERMINATED - with modify_constant_data( - [(MOCK_GET_AGENT_STATUS[f"{host.bk_cloud_id}:{host.inner_ip}"], {"bk_agent_alive": 0})] - ): - update_or_create_host_agent_status(None, 0, 1) - self.assertEqual( - ProcessStatus.objects.get(bk_host_id=host.bk_host_id).status, constants.PROC_STATUS_DICT[2] - ) + process_status = ProcessStatus.objects.get(bk_host_id=host.bk_host_id) + self.assertEqual(process_status.status, constants.ProcStateType.NOT_INSTALLED) diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py index 63f8f1d88..17feb3d96 100644 --- a/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py +++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_proc_status_task.py @@ -9,53 +9,50 @@ specific language governing permissions and limitations under the License. """ -import contextlib from unittest.mock import patch +from apps.mock_data.api_mkd.gse.unit import GSE_PROCESS_NAME +from apps.mock_data.api_mkd.gse.utils import GseApiMockClient +from apps.mock_data.common_unit.host import ( + HOST_MODEL_DATA, + HOST_MODEL_DATA_WITH_AGENT_ID, +) from apps.node_man import constants from apps.node_man.models import Host, ProcessStatus from apps.node_man.periodic_tasks.sync_proc_status_task import ( + sync_proc_status_periodic_task, update_or_create_proc_status, ) -from apps.node_man.tests.test_pericdic_tasks.mock_data import ( - MOCK_HOST, - MOCK_PROC_NAME, - MOCK_PROC_STATUS, -) -from apps.node_man.tests.test_pericdic_tasks.utils import MockClient from apps.utils.unittest.testcase import CustomBaseTestCase class TestSyncProcStatus(CustomBaseTestCase): - @contextlib.contextmanager - def init_db(self): - # 创造一个虚拟主机和虚拟进程信息 - mock_host, _ = Host.objects.get_or_create(**MOCK_HOST) - mock_proc, _ = ProcessStatus.objects.get_or_create(**MOCK_PROC_STATUS) - - yield mock_host + def test_sync_proc_status_periodic_task(self): + Host.objects.create(**HOST_MODEL_DATA) + sync_proc_status_periodic_task() - # 清除相关数据 - Host.objects.get(bk_host_id=MOCK_HOST["bk_host_id"]).delete() - ProcessStatus.objects.get(bk_host_id=MOCK_HOST["bk_host_id"], name=MOCK_PROC_STATUS["name"]).delete() - - @patch("apps.node_man.periodic_tasks.sync_proc_status_task.client_v2", MockClient) + @patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient()) def test_update_or_create_proc_status(self, *args, **kwargs): - with self.init_db() as mock_host: - # 测试update_proc_status - # result = query_proc_status("test_proc", [{"ip": "127.0.0.1", "bk_cloud_id": 0}]) - update_or_create_proc_status(None, [mock_host], [MOCK_PROC_NAME], 0) - mock_proc = ProcessStatus.objects.get(bk_host_id=MOCK_HOST["bk_host_id"], name=MOCK_PROC_NAME) - self.assertEqual( - [mock_proc.status, mock_proc.bk_host_id, mock_proc.name], - [constants.ProcStateType.RUNNING, MOCK_HOST["bk_host_id"], MOCK_PROC_NAME], - ) - - # 测试create_proc_status 先删掉存在的proc_status - ProcessStatus.objects.get(bk_host_id=MOCK_HOST["bk_host_id"], name=MOCK_PROC_NAME).delete() - update_or_create_proc_status(None, [mock_host], [MOCK_PROC_NAME], 0) - mock_proc = ProcessStatus.objects.get(bk_host_id=MOCK_HOST["bk_host_id"], name=MOCK_PROC_NAME) - self.assertEqual( - [mock_proc.status, mock_proc.bk_host_id, mock_proc.name], - [constants.ProcStateType.RUNNING, MOCK_HOST["bk_host_id"], MOCK_PROC_NAME], - ) + host = Host.objects.create(**HOST_MODEL_DATA) + # 测试新建proc_status + update_or_create_proc_status(None, [host], [GSE_PROCESS_NAME], 0) + mock_proc = ProcessStatus.objects.get(bk_host_id=host.bk_host_id, name=GSE_PROCESS_NAME) + self.assertEqual( + [mock_proc.status, mock_proc.bk_host_id, mock_proc.name], + [constants.ProcStateType.TERMINATED, host.bk_host_id, GSE_PROCESS_NAME], + ) + + # 测试update_proc_status + mock_proc = ProcessStatus.objects.get(bk_host_id=host.bk_host_id, name=GSE_PROCESS_NAME) + mock_proc.status = constants.ProcStateType.MANUAL_STOP + mock_proc.save() + update_or_create_proc_status(None, [host], [GSE_PROCESS_NAME], 0) + self.assertEqual( + [mock_proc.status, mock_proc.bk_host_id, mock_proc.name], + [constants.ProcStateType.MANUAL_STOP, host.bk_host_id, GSE_PROCESS_NAME], + ) + + @patch("apps.node_man.periodic_tasks.sync_proc_status_task.GseApi", GseApiMockClient()) + def test_update_or_create_proc_status_with_agent_id(self, *args, **kwargs): + host = Host.objects.create(**HOST_MODEL_DATA_WITH_AGENT_ID) + update_or_create_proc_status(None, [host], [GSE_PROCESS_NAME], 0) diff --git a/apps/node_man/tests/test_pericdic_tasks/utils.py b/apps/node_man/tests/test_pericdic_tasks/utils.py index 056d20d78..21758503f 100644 --- a/apps/node_man/tests/test_pericdic_tasks/utils.py +++ b/apps/node_man/tests/test_pericdic_tasks/utils.py @@ -47,6 +47,7 @@ def list_resource_pool_hosts(cls, *args, **kwargs): for index in range(count): MOCK_HOST_TMP["bk_host_id"] = index MOCK_HOST_TMP["bk_cloud_id"] = index + MOCK_HOST_TMP["bk_agent_id"] = index MOCK_HOST_TMP["bk_host_innerip"] = MOCK_HOST_TMP["inner_ip"] MOCK_HOST_TMP["bk_host_outerip"] = MOCK_HOST_TMP["outer_ip"] host_list.append(copy.deepcopy(MOCK_HOST_TMP)) diff --git a/common/api/modules/gse.py b/common/api/modules/gse.py index db425fe19..918eba169 100644 --- a/common/api/modules/gse.py +++ b/common/api/modules/gse.py @@ -67,3 +67,15 @@ def __init__(self): module=self.MODULE, description="获取Agent状态", ) + self.get_agent_info_list = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_info_list/", + module=self.MODULE, + description="获取Agent版本信息", + ) + self.get_agent_state_list = DataAPI( + method="POST", + url=GSE_APIGATEWAY_ROOT_V2 + "get_agent_state_list/", + module=self.MODULE, + description="GSE2.0 获取Agent状态", + ) diff --git a/config/default.py b/config/default.py index c36dd6f16..3cdd558d3 100644 --- a/config/default.py +++ b/config/default.py @@ -98,6 +98,8 @@ BLUEKING_BIZ_ID = 9991001 # 作业平台版本 JOB_VERSION = os.getenv("JOB_VERSION") or "V3" +# 管控平台平台版本 +GSE_VERSION = os.getenv("GSE_VERSION") or "V1" # 资源池业务ID BK_CMDB_RESOURCE_POOL_BIZ_ID = int(os.getenv("BK_CMDB_RESOURCE_POOL_BIZ_ID", 1)) or 1 BK_CMDB_RESOURCE_POOL_BIZ_NAME = "资源池"