Skip to content

Commit

Permalink
feature: P-Agent 安装时更新 Agent 2.0 安装包到 Proxy (close TencentBlueKing#1315)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 authored and ZhuoZhuoCrayon committed Dec 22, 2022
1 parent 6c8e85e commit d21c3c3
Show file tree
Hide file tree
Showing 19 changed files with 456 additions and 87 deletions.
13 changes: 12 additions & 1 deletion apps/backend/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ def update_process_status(cls, status: str, name=components.UpdateProcessStatusC

@classmethod
def push_files_to_proxy(cls, file: Dict[str, Any]):
"""下发文件到 Proxy"""
"""
下发文件到 Proxy
"""
act = AgentServiceActivity(component_code=components.PushFilesToProxyComponent.code, name=file["name"])
act.component.inputs.file_list = Var(type=Var.PLAIN, value=file["files"])
act.component.inputs.from_type = Var(type=Var.PLAIN, value=file.get("from_type"))
Expand Down Expand Up @@ -251,3 +253,12 @@ def push_host_identifier(cls):
name=components.PushIdentifierHostsComponent.name,
)
return act

@classmethod
def push_agent_pkg_to_proxy(cls):
"""下发 Agent 安装包到 Proxy"""
act = AgentServiceActivity(
component_code=components.PushAgentPkgToProxyComponent.code,
name=components.PushAgentPkgToProxyComponent.name,
)
return act
15 changes: 10 additions & 5 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,23 @@ def get_general_node_type(cls, node_type: str) -> str:
]

@classmethod
def get_agent_upgrade_pkg_name(cls, common_data: "AgentCommonData", host: models.Host) -> str:
def get_agent_pkg_name(cls, common_data: "AgentCommonData", host: models.Host, is_upgrade: bool = False) -> str:
"""
获取 Agent 升级包名称
:param common_data:
:param host:
:param common_data: AgentCommonData
:param host: models.Host
:param is_upgrade: bool 是否升级包
:return:
"""
# GSE2.0 安装包和升级包复用同一个包
package_type = ("client", "proxy")[host.node_type == constants.NodeType.PROXY]
agent_step_adapter = common_data.agent_step_adapter
if not agent_step_adapter.is_legacy:
setup_info = agent_step_adapter.get_setup_info()
return f"{setup_info.name}-{setup_info.version}.tgz"

# GSE1.0 的升级包是独立的,添加了 _upgrade 后缀
pkg_suffix = "_upgrade" if is_upgrade else ""
if host.os_version:
major_version_number = None
if host.os_type == constants.OsType.AIX:
Expand All @@ -128,10 +133,10 @@ def get_agent_upgrade_pkg_name(cls, common_data: "AgentCommonData", host: models
if not major_version_number:
raise OsVersionPackageValidationError(os_version=host.os_version, os_type=host.os_type)
agent_upgrade_package_name = (
f"gse_{package_type}-{host.os_type.lower()}{major_version_number}-{host.cpu_arch}_upgrade.tgz"
f"gse_{package_type}-{host.os_type.lower()}{major_version_number}-{host.cpu_arch}{pkg_suffix}.tgz"
)
else:
agent_upgrade_package_name = f"gse_{package_type}-{host.os_type.lower()}-{host.cpu_arch}_upgrade.tgz"
agent_upgrade_package_name = f"gse_{package_type}-{host.os_type.lower()}-{host.cpu_arch}{pkg_suffix}.tgz"

return agent_upgrade_package_name

Expand Down
7 changes: 7 additions & 0 deletions apps/backend/components/collections/agent_new/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .get_agent_status import GetAgentStatusService
from .install import InstallService
from .install_plugins import InstallPluginsService
from .push_agent_pkg_to_proxy import PushAgentPkgToProxyService
from .push_files_to_proxy import PushFilesToProxyService
from .push_host_identifier import PushIdentifierHostsService
from .push_upgrade_package import PushUpgradeFileService
Expand Down Expand Up @@ -193,3 +194,9 @@ class PushIdentifierHostsComponent(Component):
name = _("推送主机身份信息")
code = "push_host_identifier"
bound_service = PushIdentifierHostsService


class PushAgentPkgToProxyComponent(Component):
name = _("下发 Agent 安装包到 Proxy")
code = "push_agent_pkg_to_proxy"
bound_service = PushAgentPkgToProxyService
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from dataclasses import dataclass, fields
from typing import Dict, List

from django.conf import settings

from apps.core.files.storage import get_storage
from apps.node_man import constants, models

from .base import AgentCommonData, AgentTransferFileService


@dataclass
class PushAgentPkgCommonData(AgentCommonData):
# 云区域ID -> 存活的 Proxy 列表 映射
cloud_id__proxies_map: Dict[int, List[models.Host]]
# 安装通道ID -> 安装通道对象 映射
install_channel_id__host_objs_map: Dict[int, List[models.Host]]


class PushAgentPkgToProxyService(AgentTransferFileService):
"""
下发 Agent 安装包到 Proxy,适用于 P-Agent 安装、安装通道安装等场景
"""

def _execute(self, data, parent_data, common_data: AgentCommonData):
# 提前批量查询安装通道、路径等数据,避免在 get_target_servers/get_job_file_params 中循环对单机进行查询
cloud_ids = {host.bk_cloud_id for host in common_data.host_id_obj_map.values()}
cloud_id__proxies_map = self.get_cloud_id__proxies_map(cloud_ids)
install_channel_ids = [
host.install_channel_id for host in common_data.host_id_obj_map.values() if host.install_channel_id
]
install_channel_id__host_objs_map = models.InstallChannel.install_channel_id__host_objs_map(install_channel_ids)
push_agent_pkg_common_data = PushAgentPkgCommonData(
cloud_id__proxies_map=cloud_id__proxies_map,
install_channel_id__host_objs_map=install_channel_id__host_objs_map,
**{field.name: getattr(common_data, field.name) for field in fields(common_data)}
)
return super()._execute(data, parent_data, push_agent_pkg_common_data)

def get_target_servers(self, data, common_data: PushAgentPkgCommonData, host: models.Host):
"""
查询主机所属云区域的 proxies,或者安装通道 jump server 的机器
"""
target_servers = {"ip_list": [], "host_id_list": []}
if host.install_channel_id:
for install_channel_host in common_data.install_channel_id__host_objs_map[host.install_channel_id]:
target_servers["ip_list"].append(
{"bk_cloud_id": install_channel_host.bk_cloud_id, "ip": install_channel_host.inner_ip}
)
target_servers["host_id_list"].append(install_channel_host.bk_host_id)
else:
for proxy in common_data.cloud_id__proxies_map[host.bk_cloud_id]:
target_servers["ip_list"].append({"bk_cloud_id": proxy.bk_cloud_id, "ip": host.inner_ip})
target_servers["host_id_list"].append(proxy.bk_host_id)
return target_servers

def get_job_file_params(self, data, common_data: AgentCommonData, host: models.Host):
"""
GSE2.0 Agent 包目录按以下进行约束
├── agent
│ ├── linux
│ │ ├── x86_64
│ │ │ └── gse_agent-2.0.0.tgz
│ │ └── aarch64
│ │ └── gse_agent-2.0.0.tgz
│ └── windows
│ └── x86_64
│ └── gse_agent-2.0.0.tgz
"""
pkg_name = self.get_agent_pkg_name(common_data, host)
host_ap = common_data.host_id__ap_map[host.bk_host_id]
download_path = host_ap.nginx_path or settings.DOWNLOAD_PATH
agent_path = constants.LINUX_SEP.join([download_path, "agent", host.os_type.lower()])
storage = get_storage()
cpu_arch_list, _ = storage.listdir(agent_path)
return [
{
"file_list": ["/".join([agent_path, cpu_arch, pkg_name])],
"file_target_path": "/".join([agent_path, cpu_arch]),
}
for cpu_arch in cpu_arch_list
]

def get_job_param_os_type(self, host: models.Host) -> str:
# Proxy 或者安装通道跳板机 要求是 Linux 机器
return constants.OsType.LINUX
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_upgrade_package_source_path(self, common_data: AgentCommonData, host: mo

def get_file_list(self, data, common_data: AgentCommonData, host: models.Host) -> List[str]:
root_path, agent_path = self.get_upgrade_package_source_path(common_data, host)
agent_upgrade_package_name = self.get_agent_upgrade_pkg_name(common_data, host=host)
agent_upgrade_package_name = self.get_agent_pkg_name(common_data, host=host, is_upgrade=True)

file_list: List[str] = [os.path.join(agent_path, agent_upgrade_package_name)]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def script_name(self):
return "upgrade_command"

def get_script_content(self, data, common_data: AgentCommonData, host: models.Host) -> str:
agent_upgrade_pkg_name = self.get_agent_upgrade_pkg_name(common_data, host)
agent_upgrade_pkg_name = self.get_agent_pkg_name(common_data, host, is_upgrade=True)
general_node_type = self.get_general_node_type(host.node_type)
agent_config = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)

Expand Down
103 changes: 60 additions & 43 deletions apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,21 @@ def append_unique_key_params_info(

return multi_job_params_map

def get_target_servers(
self, data, common_data: CommonData, host: models.Host
) -> Dict[str, Union[List[Dict[str, Union[int, str]]], List[int]]]:
"""
获取执行目标服务器
:param data:
:param common_data:
:param host: 主机类型
:return: 目标服务器
"""
return {"ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}], "host_id_list": [host.bk_host_id]}

def get_job_param_os_type(self, host: models.Host) -> str:
return host.os_type


class JobExecuteScriptService(JobV3BaseService, metaclass=abc.ABCMeta):
def inputs_format(self):
Expand Down Expand Up @@ -446,7 +461,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
"script_content": script_content,
"script_param": script_param,
"timeout": timeout,
"os_type": host_obj.os_type,
"os_type": self.get_job_param_os_type(host_obj),
},
}

Expand All @@ -472,18 +487,6 @@ def get_script_param(self, data, common_data: CommonData, host: models.Host) ->
"""
return data.get_one_of_inputs("script_param", default="")

def get_target_servers(
self, data, common_data: CommonData, host: models.Host
) -> Dict[str, Union[List[Dict[str, Union[int, str]]], List[int]]]:
"""
获取目标服务器
:param data:
:param common_data:
:param host: 主机类型
:return: 目标服务器
"""
return {"ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}], "host_id_list": [host.bk_host_id]}


class JobTransferFileService(JobV3BaseService, metaclass=abc.ABCMeta):
def inputs_format(self):
Expand All @@ -499,37 +502,36 @@ def _execute(self, data, parent_data, common_data: CommonData):
for sub_inst in common_data.subscription_instances:
bk_host_id = sub_inst.instance_info["host"]["bk_host_id"]
host_obj = common_data.host_id_obj_map[bk_host_id]
target_servers = self.get_target_servers(data=data, common_data=common_data, host=host_obj)

file_list = self.get_file_list(data=data, common_data=common_data, host=host_obj)
file_target_path = self.get_file_target_path(data=data, common_data=common_data, host=host_obj)
# 如果分发的文件列表 & 目标路径一致,合并到一个作业中,提高执行效率
md5_key = f"{self.get_md5('|'.join(sorted(file_list)))}-{file_target_path}"

if md5_key in multi_job_params_map:
multi_job_params_map = self.append_unique_key_params_info(
multi_job_params_map=multi_job_params_map, unique_key=md5_key, host_obj=host_obj, sub_inst=sub_inst
)
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_transfer_file,
"subscription_instance_id": [sub_inst.id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": {
"ip_list": [
{
"bk_cloud_id": host_obj.bk_cloud_id,
"ip": host_obj.inner_ip,
}
],
"host_id_list": [host_obj.bk_host_id],
# 所有的逻辑处理都基于 host_obj, 仅执行目标使用 target_host
job_file_params = self.get_job_file_params(data=data, common_data=common_data, host=host_obj)
for job_file_param in job_file_params:
file_list = job_file_param["file_list"]
file_target_path = job_file_param["file_target_path"]
# 如果分发的文件列表 & 目标路径一致,合并到一个作业中,提高执行效率
md5_key = f"{self.get_md5('|'.join(sorted(file_list)))}-{file_target_path}"

if md5_key in multi_job_params_map:
multi_job_params_map = self.append_unique_key_params_info(
multi_job_params_map=multi_job_params_map,
unique_key=md5_key,
host_infos=target_servers,
sub_inst=sub_inst,
)
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_transfer_file,
"subscription_instance_id": [sub_inst.id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": target_servers,
"file_target_path": file_target_path,
"file_source_list": [{"file_list": file_list}],
"timeout": timeout,
"os_type": self.get_job_param_os_type(host_obj),
},
"file_target_path": file_target_path,
"file_source_list": [{"file_list": file_list}],
"timeout": timeout,
"os_type": host_obj.os_type,
},
}
}

self.run_job_or_finish_schedule(multi_job_params_map)

Expand All @@ -553,6 +555,21 @@ def get_file_target_path(self, data, common_data: CommonData, host: models.Host)
"""
return data.get_one_of_inputs("file_target_path", default="")

def get_job_file_params(
self, data, common_data: CommonData, host: models.Host
) -> List[Dict[str, Union[List[str], str]]]:
"""
1. 在一般简单的场景下,目标路径只有一个,仅需一个 JOB 任务即可完成。
2. 复杂的场景,存在多个目标路径,由于作业平台的一个文件分发任务只允许一个目标路径,
因此需多个任务来完成,由子类实现文件源和目标的关系
"""
return [
{
"file_list": self.get_file_list(data=data, common_data=common_data, host=host),
"file_target_path": self.get_file_target_path(data=data, common_data=common_data, host=host),
}
]


class JobPushConfigService(JobV3BaseService, metaclass=abc.ABCMeta):
def inputs_format(self):
Expand Down Expand Up @@ -616,7 +633,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
"file_target_path": file_target_path,
"file_list": file_source_list,
"timeout": timeout,
"os_type": host_obj.os_type,
"os_type": self.get_job_param_os_type(host_obj),
},
}

Expand Down
4 changes: 1 addition & 3 deletions apps/backend/plugin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,7 @@ def start_debug(self, request):
if config_template.plugin_name != package.project:
raise ValidationError("config {} does not belong to plugin {}".format(config_id, package.project))

step_config_templates.append(
{"version": config_template.version, "name": config_template.name}
)
step_config_templates.append({"version": config_template.version, "name": config_template.name})
step_params_context.update(json.loads(config.render_data))

with transaction.atomic():
Expand Down
Loading

0 comments on commit d21c3c3

Please sign in to comment.