From ea16267fa8295e21a82b23a3d9f814ffb70873c0 Mon Sep 17 00:00:00 2001 From: xcwang <1366993017@qq.com> Date: Tue, 14 Jun 2022 22:09:44 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20JOB=20API=20=E9=80=82=E9=85=8D=20(cl?= =?UTF-8?q?osed=20#785)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/agent/tasks.py | 4 +- .../agent_new/check_policy_gse_to_proxy.py | 9 +- .../collections/agent_new/install.py | 16 +++- apps/backend/components/collections/job.py | 44 ++++++++-- apps/backend/components/collections/plugin.py | 60 ++++++++----- apps/backend/tests/api/test_job.py | 3 + .../components/collections/agent_new/base.py | 1 + .../test_check_policy_gse_to_proxy.py | 2 + .../periodic_tasks/test_update_proxy_file.py | 32 ++++--- apps/core/files/base.py | 4 +- .../periodic_tasks/update_proxy_file.py | 87 ++++++++++++++----- apps/node_man/periodic_tasks/utils.py | 36 +++++--- 12 files changed, 215 insertions(+), 83 deletions(-) diff --git a/apps/backend/agent/tasks.py b/apps/backend/agent/tasks.py index 66e1ff144..b3e50d25f 100644 --- a/apps/backend/agent/tasks.py +++ b/apps/backend/agent/tasks.py @@ -111,7 +111,9 @@ def arg_parser() -> argparse.ArgumentParser: "account": "root", "is_param_sensitive": 1, "script_type": 4, - "target_server": {"ip_list": [{"bk_cloud_id": proxy.bk_cloud_id, "ip": proxy.inner_ip}]}, + "target_server": { + "ip_list": [{"bk_cloud_id": proxy.bk_cloud_id, "ip": proxy.inner_ip, "host_id": proxy.bk_host_id}] + }, } data = client_v2.job.fast_execute_script(kwargs) job_instance_id = data["job_instance_id"] diff --git a/apps/backend/components/collections/agent_new/check_policy_gse_to_proxy.py b/apps/backend/components/collections/agent_new/check_policy_gse_to_proxy.py index 5eab699ed..c7f798b72 100644 --- a/apps/backend/components/collections/agent_new/check_policy_gse_to_proxy.py +++ b/apps/backend/components/collections/agent_new/check_policy_gse_to_proxy.py @@ -53,7 +53,14 @@ def script_name(self): def get_target_servers(self, data, common_data: CommonData, host: models.Host) -> List[Dict[str, Any]]: # 取接入点 ap = common_data.ap_id_obj_map[host.ap_id] - return [{"bk_cloud_id": constants.DEFAULT_CLOUD, "ip": bt_server["inner_ip"]} for bt_server in ap.btfileserver] + return [ + { + "bk_cloud_id": constants.DEFAULT_CLOUD, + "ip": bt_server["inner_ip"], + "bk_host_id": bt_server.get("bk_host_id", ""), + } + for bt_server in ap.btfileserver + ] def get_script_content(self, data, common_data: AgentCommonData, host: models.Host) -> str: port_config = common_data.host_id__ap_map[host.bk_host_id].port_config diff --git a/apps/backend/components/collections/agent_new/install.py b/apps/backend/components/collections/agent_new/install.py index 60508ed44..9378f5c68 100644 --- a/apps/backend/components/collections/agent_new/install.py +++ b/apps/backend/components/collections/agent_new/install.py @@ -15,7 +15,7 @@ import socket import time from collections import defaultdict -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union from django.conf import settings from django.utils.translation import ugettext_lazy as _ @@ -386,10 +386,22 @@ def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools # 使用全业务执行作业 bk_biz_id = settings.BLUEKING_BIZ_ID + target_server: Dict[str, List[Union[int, Dict[str, Union[int, str]]]]] = ( + {"host_id_list": [jump_server.bk_host_id]} + if settings.ENABLE_DBCP + else { + "ip_list": [ + { + "ip": jump_server.inner_ip, + "bk_cloud_id": jump_server.bk_cloud_id, + } + ] + } + ) kwargs = { "bk_biz_id": bk_biz_id, "task_name": f"NODEMAN_{sub_inst_id}_{self.__class__.__name__}", - "target_server": {"ip_list": [{"ip": jump_server.inner_ip, "bk_cloud_id": jump_server.bk_cloud_id}]}, + "target_server": target_server, "timeout": constants.JOB_TIMEOUT, "account_alias": settings.BACKEND_UNIX_ACCOUNT, "script_language": constants.ScriptLanguageType.PYTHON.value, diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index 7ccbd0f95..90a543508 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -81,10 +81,12 @@ def request_single_job_and_create_map( self, job_func, subscription_instance_id: [int, list], subscription_id, job_params: dict ): """请求作业平台并创建与订阅实例的映射""" - if not job_params["target_server"]["ip_list"]: - # 没有IP列表时,不发起请求 - return [] + host_interaction_from = ("ip_list", "host_id_list")[settings.BKAPP_ENABLE_DHCP] + host_interaction_data_list = job_params["target_server"].get(host_interaction_from, []) + if not host_interaction_data_list: + return [] + job_params["target_server"] = {host_interaction_from: host_interaction_data_list} # 补充作业平台通用参数 if not job_params.get("os_type"): job_params["os_type"] = self.DEFAULT_OS_TYPE @@ -204,7 +206,7 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) -> # 构造主机作业状态映射表 cloud_ip_status_map: Dict[str, Dict] = {} for ip_result in ip_results["step_instance_list"][0].get("step_ip_result_list") or []: - cloud_ip_status_map[f'{ip_result["bk_cloud_id"]}-{ip_result["ip"]}'] = ip_result + cloud_ip_status_map[f'{ip_result["bk_cloud_id"]}-{ip_result["ip"]}-{ip_result["bk_host_id"]}'] = ip_result succeed_sub_inst_ids: List[int] = [] subscription_instances = models.SubscriptionInstanceRecord.objects.filter( @@ -213,7 +215,7 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) -> bk_host_ids = [sub_inst.instance_info["host"]["bk_host_id"] for sub_inst in subscription_instances] host_id__cloud_ip_map = { - host_info["bk_host_id"]: f"{host_info['bk_cloud_id']}-{host_info['inner_ip']}" + host_info["bk_host_id"]: f"{host_info['bk_cloud_id']}-{host_info['inner_ip']}-{host_info['bk_host_id']}" for host_info in models.Host.objects.filter(bk_host_id__in=bk_host_ids).values( "bk_host_id", "inner_ip", "bk_cloud_id" ) @@ -392,13 +394,17 @@ def _execute(self, data, parent_data, common_data: CommonData): if md5_key in multi_job_params_map: multi_job_params_map[md5_key]["subscription_instance_id"].append(sub_inst.id) multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].extend(target_servers) + multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].append( + {"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip} + ) + multi_job_params_map[md5_key]["job_params"]["target_server"]["host_id_list"].append(host_obj.bk_host_id) else: multi_job_params_map[md5_key] = { "job_func": JobApi.fast_execute_script, "subscription_instance_id": [sub_inst.id], "subscription_id": common_data.subscription.id, "job_params": { - "target_server": {"ip_list": target_servers}, + "target_server": {"ip_list": target_servers, "host_id_list": host_obj.bk_host_id}, "script_content": script_content, "script_param": script_param, "timeout": timeout, @@ -436,7 +442,7 @@ def get_target_servers(self, data, common_data: CommonData, host: models.Host) - :param host: 主机类型 :return: 目标服务器 """ - return [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}] + return [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip, "bk_host_id": host.bk_host_id}] class JobTransferFileService(JobV3BaseService, metaclass=abc.ABCMeta): @@ -464,13 +470,22 @@ def _execute(self, data, parent_data, common_data: CommonData): multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].append( {"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip} ) + multi_job_params_map[md5_key]["job_params"]["target_server"]["host_id_list"].append(host_obj.bk_host_id) else: multi_job_params_map[md5_key] = { "job_func": JobApi.fast_transfer_file, "subscription_instance_id": [sub_inst.id], "subscription_id": common_data.subscription.id, "job_params": { - "target_server": {"ip_list": [{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}]}, + "target_server": { + "ip_list": [ + { + "bk_cloud_id": host_obj.bk_cloud_id, + "ip": host_obj.inner_ip, + } + ], + "host_id_list": [host_obj.bk_host_id], + }, "file_target_path": file_target_path, "file_source_list": [{"file_list": file_list}], "timeout": timeout, @@ -538,6 +553,9 @@ def _execute(self, data, parent_data, common_data: CommonData): multi_job_params_map[job_unique_key]["job_params"]["target_server"]["ip_list"].append( {"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip} ) + multi_job_params_map[job_unique_key]["job_params"]["target_server"]["host_id_list"].append( + host_obj.bk_host_id + ) else: file_source_list = [] for config_info in config_info_list: @@ -549,7 +567,15 @@ def _execute(self, data, parent_data, common_data: CommonData): "subscription_instance_id": [sub_inst.id], "subscription_id": common_data.subscription.id, "job_params": { - "target_server": {"ip_list": [{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}]}, + "target_server": { + "ip_list": [ + { + "bk_cloud_id": host_obj.bk_cloud_id, + "ip": host_obj.inner_ip, + } + ], + "host_id_list": [host_obj.bk_host_id], + }, "file_target_path": file_target_path, "file_list": file_source_list, "timeout": timeout, diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index 82e8c755d..13f8804a5 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -15,7 +15,7 @@ import os from collections import defaultdict from functools import reduce -from typing import Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Set, Tuple, Union from django.conf import settings from django.db import IntegrityError @@ -535,6 +535,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): os_type = host.os_type.lower() or constants.OsType.LINUX.lower() package_path = "/".join((nginx_path, os_type, host.cpu_arch, package.pkg_name)) jobs[package_path]["ip_list"].append({"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}) + jobs[package_path]["host_id_list"].append(host.bk_host_id) jobs[package_path]["subscription_instance_ids"].append(subscription_instance.id) jobs[package_path]["temp_path"] = agent_config["temp_path"] jobs[package_path]["os_type"] = host.os_type @@ -544,7 +545,6 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): for package_path, job in jobs.items(): file_list = [package_path] file_list = self.append_extra_files(job["os_type"], file_list, nginx_path) - multi_job_params.append( { "job_func": JobApi.fast_transfer_file, @@ -554,7 +554,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): "file_target_path": job["temp_path"], "file_source_list": [{"file_list": file_list}], "os_type": job["os_type"], - "target_server": {"ip_list": job["ip_list"]}, + "target_server": {"ip_list": job["ip_list"], "host_id_list": job["host_id_list"]}, }, } ) @@ -614,7 +614,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): group_id_instance_map = common_data.group_id_instance_map # 批量请求作业平台的参数 - multi_job_params_map = {} + multi_job_params_map: Dict[str, Dict[str, Any]] = {} for process_status in process_statuses: if self.need_skipped(process_status, common_data): continue @@ -639,13 +639,17 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): multi_job_params_map[key]["job_params"]["target_server"]["ip_list"].append( {"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip} ) + multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append(host.bk_host_id) else: multi_job_params_map[key] = { "job_func": JobApi.fast_execute_script, "subscription_instance_id": [subscription_instance_id], "subscription_id": common_data.subscription.id, "job_params": { - "target_server": {"ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}]}, + "target_server": { + "ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}], + "host_id_list": [host.bk_host_id], + }, "script_content": script_content, "script_param": script_param, "timeout": timeout, @@ -816,17 +820,19 @@ def allocate_port_to_process_status( bk_host_id = process_status.bk_host_id # 查询并解析该主机已被占用的端口号 - result = JobApi.get_job_instance_ip_log( - { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, - "bk_scope_id": settings.BLUEKING_BIZ_ID, - "job_instance_id": job_instance_id, - "step_instance_id": step_instance_id, - "bk_cloud_id": host.bk_cloud_id, - "ip": host.inner_ip, - } + instance_log_base_params: Dict[str, Union[str, int]] = { + "bk_biz_id": settings.BLUEKING_BIZ_ID, + "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, + "bk_scope_id": settings.BLUEKING_BIZ_ID, + "job_instance_id": job_instance_id, + "step_instance_id": step_instance_id, + } + host_interaction_params: Dict[str, Union[str, int]] = ( + {"bk_host_id": host.bk_host_id} + if settings.BKAPP_ENABLE_DHCP + else {"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip} ) + result = JobApi.get_job_instance_ip_log({**instance_log_base_params, **host_interaction_params}) used_ports = self.parse_used_port(result.get("log_content", "")) port_range_list = models.ProcControl.parse_port_range(plugin_control.port_range) queryset = models.ProcessStatus.objects.filter(bk_host_id=bk_host_id) @@ -931,7 +937,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): subscription_step = models.SubscriptionStep.objects.get(id=subscription_step_id) # 组装调用作业平台的参数 - multi_job_params_map = {} + multi_job_params_map: Dict[str, Dict[str, Any]] = {} for process_status in process_statuses: target_bk_host_id = process_status.bk_host_id subscription_instance = group_id_instance_map.get(process_status.group_id) @@ -967,6 +973,9 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): multi_job_params_map[key]["job_params"]["target_server"]["ip_list"].append( {"bk_cloud_id": target_host.bk_cloud_id, "ip": target_host.inner_ip} ) + multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append( + target_host.bk_host_id + ) else: multi_job_params_map[key] = { "job_func": JobApi.push_config_file, @@ -975,7 +984,13 @@ def _execute(self, data, parent_data, common_data: PluginCommonData): "job_params": { "os_type": target_host.os_type, "target_server": { - "ip_list": [{"bk_cloud_id": target_host.bk_cloud_id, "ip": target_host.inner_ip}] + "ip_list": [ + { + "bk_cloud_id": target_host.bk_cloud_id, + "ip": target_host.inner_ip, + } + ], + "host_id_list": [target_host.bk_host_id], }, "file_target_path": file_target_path, "file_list": [{"file_name": file_name, "content": process_parms(file_content)}], @@ -1331,20 +1346,23 @@ def _schedule(self, data, parent_data, callback_data=None): ) # 调试插件时仅有一个IP,以下取值方式与作业平台API文档一致,不会抛出 IndexError/KeyError 的异常 step_instance_id = result["step_instance_list"][0]["step_instance_id"] + bk_host_id = result["step_instance_list"][0]["step_ip_result_list"][0]["bk_host_id"] ip = result["step_instance_list"][0]["step_ip_result_list"][0]["ip"] bk_cloud_id = result["step_instance_list"][0]["step_ip_result_list"][0]["bk_cloud_id"] - params = { + instance_log_base_params: Dict[str, Union[str, int]] = { "job_instance_id": job_instance_id, "bk_biz_id": settings.BLUEKING_BIZ_ID, "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, "bk_scope_id": settings.BLUEKING_BIZ_ID, "bk_username": settings.BACKEND_JOB_OPERATOR, "step_instance_id": step_instance_id, - "ip": ip, - "bk_cloud_id": bk_cloud_id, + "bk_host_id": bk_host_id, } - task_result = JobApi.get_job_instance_ip_log(params) + host_interaction_params: Dict[str, Union[str, int]] = ( + {"bk_host_id": bk_host_id} if settings.BKAPP_ENABLE_DHCP else {"ip": ip, "bk_cloud_id": bk_cloud_id} + ) + task_result = JobApi.get_job_instance_ip_log({**instance_log_base_params, **host_interaction_params}) # 只写入新的日志,保证轮询过程中不会重复写入job的日志 last_logs = data.get_one_of_outputs("last_logs", "") diff --git a/apps/backend/tests/api/test_job.py b/apps/backend/tests/api/test_job.py index a4dca9e78..bb6bf8df7 100644 --- a/apps/backend/tests/api/test_job.py +++ b/apps/backend/tests/api/test_job.py @@ -153,6 +153,7 @@ class TestJob(TestCase): { "total_time": 0.001, "ip": "127.0.0.1", + "host_id": 1, "start_time": "2019-08-12 19:04:07 +0800", "log_content": "\u30102019-08-12 19:04:07.757\u3011 FileName\uff1a/data/dev_" "pipeline_unit_test FileSize\uff1a9.0 Byte State\uff1adownload" @@ -212,6 +213,7 @@ def test_fast_execute_script(self): assert len(task_result["success"]) == 1 assert set(task_result["success"][0].keys()) == { "ip", + "host_id", "log_content", "bk_cloud_id", "error_code", @@ -236,6 +238,7 @@ def test_fast_push_file(self): assert len(task_result["success"]) == 1 assert set(task_result["success"][0].keys()) == { "ip", + "host_id", "log_content", "bk_cloud_id", "error_code", diff --git a/apps/backend/tests/components/collections/agent_new/base.py b/apps/backend/tests/components/collections/agent_new/base.py index 19f0dd295..6a5cff3cf 100644 --- a/apps/backend/tests/components/collections/agent_new/base.py +++ b/apps/backend/tests/components/collections/agent_new/base.py @@ -67,6 +67,7 @@ def structure_mock_data(cls): info_to_be_updated = { "ip": host_obj.inner_ip, "bk_cloud_id": host_obj.bk_cloud_id, + "bk_host_id": host_obj.bk_host_id, "status": constants.BkJobStatus.SUCCEEDED, } step_ip_result_list.append(dict(ChainMap(info_to_be_updated, step_ip_result))) diff --git a/apps/backend/tests/components/collections/agent_new/test_check_policy_gse_to_proxy.py b/apps/backend/tests/components/collections/agent_new/test_check_policy_gse_to_proxy.py index f09c97cb3..0621810e3 100644 --- a/apps/backend/tests/components/collections/agent_new/test_check_policy_gse_to_proxy.py +++ b/apps/backend/tests/components/collections/agent_new/test_check_policy_gse_to_proxy.py @@ -11,6 +11,7 @@ import base64 from typing import Dict +from django.test import override_settings from django.utils.translation import ugettext_lazy as _ from apps.backend.components.collections.agent_new import ( @@ -31,6 +32,7 @@ def get_default_case_name(cls) -> str: def component_cls(self): return components.CheckPolicyGseToProxyComponent + @override_settings(BKAPP_ENABLE_DHCP=False) def tearDown(self) -> None: record = self.job_api_mock_client.call_recorder.record host_key__script_content_map: Dict[str, str] = {} diff --git a/apps/backend/tests/periodic_tasks/test_update_proxy_file.py b/apps/backend/tests/periodic_tasks/test_update_proxy_file.py index bd38dd9ef..ea8dbfb13 100644 --- a/apps/backend/tests/periodic_tasks/test_update_proxy_file.py +++ b/apps/backend/tests/periodic_tasks/test_update_proxy_file.py @@ -18,23 +18,20 @@ import mock from django.conf import settings from django.core.management import call_command +from django.test import override_settings from mock import patch +from apps.adapters.api import gse from apps.core.files import constants as core_const from apps.mock_data import api_mkd, utils +from apps.mock_data.api_mkd.gse.utils import GseApiMockClient from apps.node_man import constants from apps.node_man.models import AccessPoint, InstallChannel from apps.node_man.tests.utils import create_ap, create_cloud_area, create_host from apps.utils import files from apps.utils.files import md5sum from apps.utils.unittest.testcase import CustomBaseTestCase - - -class GseMockClient: - def __init__(self, get_agent_status_return=None, get_agent_info_return=None): - self.gse = mock.MagicMock() - self.gse.get_agent_status = mock.MagicMock(return_value=get_agent_status_return) - self.gse.get_agent_info = mock.MagicMock(return_value=get_agent_info_return) +from env.constants import GseVersion class JobDemandMock: @@ -72,7 +69,9 @@ def __init__(self, get_file_md5_return=None, fast_transfer_file_return=None): POLL_RESULT = { "is_finished": True, "task_result": { - "success": [{"ip": utils.DEFAULT_IP, "bk_cloud_id": constants.DEFAULT_CLOUD, "log_content": ""}], + "success": [ + {"ip": utils.DEFAULT_IP, "bk_cloud_id": constants.DEFAULT_CLOUD, "log_content": "", "bk_host_id": 1} + ], "pending": [], "failed": [], }, @@ -147,15 +146,17 @@ def setUpTestData(cls): cls.JOB_MOCK_CLIENT = api_mkd.job.utils.JobApiMockClient( utils.MockReturn(return_type=utils.MockReturnType.RETURN_VALUE.value, return_obj=FAST_EXECUTE_SCRIPT) ) - cls.GSE_MOCK_CLIENT = GseMockClient( - get_agent_status_return=GET_AGENT_STATUS, - ) + cls.JOB_DEMAND_MOCK_CLIENT = JobDemandMock(poll_task_result_return=POLL_RESULT) def setUp(self) -> None: create_cloud_area(number=5, creator="admin") patch("apps.node_man.periodic_tasks.update_proxy_file.JobApi", self.JOB_MOCK_CLIENT).start() + @patch( + "apps.node_man.periodic_tasks.update_proxy_file.GseApiHelper", + gse.get_gse_api_helper(GseVersion.V2.value)(GseVersion.V2.value, GseApiMockClient()), + ) def test_file_system_update(self): # 不存在proxy self.assertIsNone(call_command("update_proxy_file")) @@ -164,20 +165,20 @@ def test_file_system_update(self): GET_AGENT_STATUS[f"{constants.DEFAULT_CLOUD}:{utils.DEFAULT_IP}"][ "bk_agent_alive" ] = constants.BkAgentStatus.NOT_ALIVE.value - patch("apps.node_man.periodic_tasks.update_proxy_file.client_v2", self.GSE_MOCK_CLIENT).start() self.init_proxy_host(alive_number=0, unknown_number=1) self.assertIsNone(call_command("update_proxy_file")) OVERWRITE_OBJ__KV_MAP["settings"]["DOWNLOAD_PATH"] = files.mk_and_return_tmpdir() with self.settings( DOWNLOAD_PATH=OVERWRITE_OBJ__KV_MAP["settings"]["DOWNLOAD_PATH"], + GSE_VERSION=GseVersion.V2.value, + BKAPP_ENABLE_DHCP=True, STORAGE_TYPE=core_const.StorageType.FILE_SYSTEM.value, ): local_files_md5_map: Dict[str, str] = {} GET_AGENT_STATUS[f"{constants.DEFAULT_CLOUD}:{utils.DEFAULT_IP}"][ "bk_agent_alive" ] = constants.BkAgentStatus.ALIVE.value - patch("apps.node_man.periodic_tasks.update_proxy_file.client_v2", self.GSE_MOCK_CLIENT).start() # 创建接入点 self.init_ap_db() @@ -220,6 +221,11 @@ def test_file_system_update(self): self.assertIsNone(call_command("update_proxy_file")) shutil.rmtree(settings.DOWNLOAD_PATH) + @override_settings(GSE_VERSION=GseVersion.V1.value) + @patch( + "apps.node_man.periodic_tasks.update_proxy_file.GseApiHelper", + gse.get_gse_api_helper(GseVersion.V1.value)(GseVersion.V1.value, GseApiMockClient()), + ) def test_blueking_artifactory_update(self): with self.settings( DOWNLOAD_PATH=OVERWRITE_OBJ__KV_MAP["settings"]["DOWNLOAD_PATH"], diff --git a/apps/core/files/base.py b/apps/core/files/base.py index af529adac..f9b20061c 100644 --- a/apps/core/files/base.py +++ b/apps/core/files/base.py @@ -182,8 +182,8 @@ def fast_transfer_file( target_server 示例 { "ip_list": [ - {"bk_cloud_id": 0, "ip": "127.0.0.1"}, - {"bk_cloud_id": 0, "ip": "127.0.0.2"} + {"bk_cloud_id": 0, "ip": "127.0.0.1", "host_id":1}, + {"bk_cloud_id": 0, "ip": "127.0.0.2", "host_id":2}, ] } :return: 作业实例ID diff --git a/apps/node_man/periodic_tasks/update_proxy_file.py b/apps/node_man/periodic_tasks/update_proxy_file.py index 6c25ad32a..139598b47 100644 --- a/apps/node_man/periodic_tasks/update_proxy_file.py +++ b/apps/node_man/periodic_tasks/update_proxy_file.py @@ -14,15 +14,15 @@ import time from collections import defaultdict from json import JSONDecodeError -from typing import Any, Dict, List +from typing import Dict, List, Union from celery.schedules import crontab from celery.task import periodic_task from django.conf import settings from django.db.models import Q +from apps.adapters.api.gse import GseApiHelper from apps.backend.api.errors import JobPollTimeout -from apps.component.esbclient import client_v2 from apps.core.files.storage import get_storage from apps.node_man import constants from apps.node_man.models import AccessPoint, Host, InstallChannel @@ -38,30 +38,46 @@ ) def update_proxy_files(): alive_hosts: List[Dict[str, str]] = [] - channel_hosts: List[Dict[str, Any]] = [] - ap_nginx_path_map: Dict[Dict[str, int]] = defaultdict(list) + ap_nginx_path_map: Dict[str, List[int]] = defaultdict(list) host_gby_ap_id: Dict[int, List[Dict[str, str]]] = defaultdict(list) - proxy_hosts = [ - {"ip": host_info["inner_ip"], "bk_cloud_id": host_info["bk_cloud_id"]} - for host_info in Host.objects.filter(node_type=constants.NodeType.PROXY).values("inner_ip", "bk_cloud_id") - ] + total_update_host_conditions = Q() + total_update_host_conditions.connector = "OR" + # TODO 通道主机数据需要迁移为 Host-ID channel_host_queryset = InstallChannel.objects.all().values("bk_cloud_id", "jump_servers") for channel_info in channel_host_queryset: for jump_server in channel_info["jump_servers"]: - channel_hosts.append({"bk_cloud_id": channel_info["bk_cloud_id"], "ip": jump_server}) + total_update_host_conditions.children.append( + Q(bk_cloud_id=channel_info["bk_cloud_id"], inner_ip=jump_server) + ) + total_update_host_conditions.children.append(Q(node_type=constants.NodeType.PROXY)) - total_update_hosts = proxy_hosts + channel_hosts + total_update_hosts_queryset = Host.objects.filter(total_update_host_conditions) + total_update_hosts: List[Dict[str, Union[str, int]]] = [ + {"ip": host_info["inner_ip"], "bk_cloud_id": host_info["bk_cloud_id"], "bk_agent_id": host_info["bk_agent_id"]} + for host_info in total_update_hosts_queryset.values("inner_ip", "bk_cloud_id", "bk_agent_id") + ] if not total_update_hosts: return # 实时查询主机状态 - agent_statuses = client_v2.gse.get_agent_status({"hosts": total_update_hosts}) - for __, host_info in agent_statuses.items(): - if host_info["bk_agent_alive"] == constants.BkAgentStatus.ALIVE.value: - alive_hosts.append({"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]}) + agent_statuses: Dict[str, Dict] = GseApiHelper.list_agent_state(total_update_hosts) + + for update_host_obj in total_update_hosts_queryset: + agent_id = GseApiHelper.get_agent_id(update_host_obj) + agent_state_info = agent_statuses.get(agent_id, {"version": "", "bk_agent_alive": None}) + agent_status = constants.PROC_STATUS_DICT.get(agent_state_info["bk_agent_alive"], None) + if agent_status == constants.ProcStateType.RUNNING: + alive_hosts.append( + { + "ip": update_host_obj.inner_ip, + "bk_cloud_id": update_host_obj.bk_cloud_id, + "bk_host_id": update_host_obj.bk_host_id, + } + ) + if not alive_hosts: return @@ -78,11 +94,21 @@ def update_proxy_files(): else: host_query_conditions = Q() host_query_conditions.connector = "OR" - for host in alive_hosts: - host_query_conditions.children.append((Q(bk_cloud_id=host["bk_cloud_id"], inner_ip=host["ip"]))) + for update_host_obj in alive_hosts: + host_query_conditions.children.append( + ( + Q( + bk_cloud_id=update_host_obj["bk_cloud_id"], + inner_ip=update_host_obj["ip"], + bk_host_id=update_host_obj["bk_host_id"], + ) + ) + ) host_queryset = Host.objects.filter(host_query_conditions) for host_obj in host_queryset: - host_gby_ap_id[host_obj.ap_id].append({"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}) + host_gby_ap_id[host_obj.ap_id].append( + {"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip, "bk_host_id": host_obj.bk_host_id} + ) # 以nginx_path为维度,分批处理 for nginx_path, ap_ids in ap_nginx_path_map.items(): same_path_hosts = [host_info for ap_id in ap_ids for host_info in host_gby_ap_id[ap_id]] @@ -90,7 +116,7 @@ def update_proxy_files(): correct_file_action(nginx_path, same_path_hosts, storage) -def correct_file_action(download_path: str, hosts: List[Dict[str, str]], storage): +def correct_file_action(download_path: str, hosts: List[Dict[str, Union[str, int]]], storage): local_file__md5_map: Dict[str, str] = {} lookup_update_host_list: List[Dict[str, str]] = [] file_names = [file_name for file_set in constants.FILES_TO_PUSH_TO_PROXY for file_name in file_set["files"]] @@ -153,7 +179,7 @@ def md5(file_name): "account_alias": constants.LINUX_ACCOUNT, "is_param_sensitive": 1, "script_language": 4, - "target_server": {"ip_list": hosts}, + "target_server": target_server_generator(hosts), } job_instance_id = JobApi.fast_execute_script(kwargs)["job_instance_id"] @@ -178,9 +204,12 @@ def md5(file_name): continue for name, file_md5 in local_file__md5_map.items(): if name not in proxy_file_md5_map or proxy_file_md5_map[name] != file_md5: - lookup_update_host_list.append( - {"ip": proxy_task_result["ip"], "bk_cloud_id": proxy_task_result["bk_cloud_id"]} - ) + if settings.BKAPP_ENABLE_DHCP: + lookup_update_host_list.append({"bk_host_id": proxy_task_result["bk_host_id"]}) + else: + lookup_update_host_list.append( + {"ip": proxy_task_result["ip"], "bk_cloud_id": proxy_task_result["bk_cloud_id"]} + ) if not lookup_update_host_list: logger.info("There are no files with local differences on proxy or channel servers that need to be updated") @@ -193,7 +222,7 @@ def md5(file_name): account_alias=constants.LINUX_ACCOUNT, file_target_path=download_path, file_source_list=[{"file_list": [os.path.join(download_path, file) for file in files]}], - target_server={"ip_list": lookup_update_host_list}, + target_server=target_server_generator(lookup_update_host_list), ) time.sleep(5) transfer_result = {"task_result": {"pending": lookup_update_host_list, "failed": []}} @@ -210,3 +239,15 @@ def md5(file_name): f"{transfer_result['task_result']['pending']}, failed hosts: {transfer_result['task_result']['failed']}" f"job_instance_id:{job_transfer_id}" ) + + +def target_server_generator(host_info_list: List[Dict[str, Union[str, int]]]): + if settings.BKAPP_ENABLE_DHCP: + host_interaction_params: Dict[str, List[Dict[str, int]]] = { + "host_id_list": [{"bk_host_id": host["bk_host_id"]} for host in host_info_list] + } + else: + host_interaction_params: Dict[str, List[Dict[str, Union[int, str]]]] = { + "ip_list": [{"ip": host["ip"], "bk_cloud_id": host["bk_cloud_id"]} for host in host_info_list] + } + return host_interaction_params diff --git a/apps/node_man/periodic_tasks/utils.py b/apps/node_man/periodic_tasks/utils.py index 862d77ff1..16dccbd7a 100644 --- a/apps/node_man/periodic_tasks/utils.py +++ b/apps/node_man/periodic_tasks/utils.py @@ -11,6 +11,7 @@ import logging import time from collections import defaultdict +from typing import Dict, Union import ujson as json from django.conf import settings @@ -60,6 +61,7 @@ def get_task_result(cls, job_instance_id: int): { 'ip': 127.0.0.1, 'bk_cloud_id': 0, + 'host_id': 1, 'log_content': 'xx', } ], @@ -80,7 +82,10 @@ def get_task_result(cls, job_instance_id: int): host_infos__gby_job_status = defaultdict(list) step_instance_id = job_status["step_instance_list"][0]["step_instance_id"] for instance in job_status["step_instance_list"][0]["step_ip_result_list"]: - host_info = {"ip": instance["ip"], "bk_cloud_id": instance["bk_cloud_id"]} + if settings.BKAPP_ENABLE_DHCP: + host_info = {"ip": instance["ip"], "bk_cloud_id": instance["bk_cloud_id"]} + else: + host_info = {"bk_host_id": instance["bk_host_id"]} host_infos__gby_job_status[instance["status"]].append(host_info) logger.info( "user->[{}] called api->[{}] and got response->[{}].".format( @@ -105,22 +110,31 @@ def get_task_result(cls, job_instance_id: int): key = "failed" for host in hosts: - log_params = { + base_log_params = { "job_instance_id": job_instance_id, "bk_biz_id": settings.BLUEKING_BIZ_ID, "bk_scope_type": constants.BkJobScopeType.BIZ_SET.value, "bk_scope_id": settings.BLUEKING_BIZ_ID, "bk_username": settings.BACKEND_JOB_OPERATOR, "step_instance_id": step_instance_id, - "ip": host["ip"], - "bk_cloud_id": host["bk_cloud_id"], } - log_result = JobApi.get_job_instance_ip_log(log_params) - task_result[key].append( - { - "ip": host["ip"], - "bk_cloud_id": host["bk_cloud_id"], - "log_content": log_result["log_content"], - } + host_interaction_params: Dict[str, Union[str, int]] = ( + {"bk_host_id": host["bk_host_id"]} + if settings.BKAPP_ENABLE_DHCP + else {"ip": host["ip"], "bk_cloud_id": host["bk_cloud_id"]} ) + log_result = JobApi.get_job_instance_ip_log({**base_log_params, **host_interaction_params}) + if settings.BKAPP_ENABLE_DHCP: + task_result[key].append( + { + "ip": host["ip"], + "bk_cloud_id": host["bk_cloud_id"], + "log_content": log_result["log_content"], + } + ) + else: + task_result[key].append( + {"bk_host_id": host["bk_host_id"], "log_content": log_result["log_content"]} + ) + return {"is_finished": is_finished, "task_result": task_result}