Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Agent / Proxy 类操作支持多接入点 Agent 操作 (closed #1714) #1791

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@
from ..base import BaseService, CommonData


@dataclass
class AgentCommonData(CommonData):
# 默认接入点
default_ap: models.AccessPoint
# 主机ID - 接入点 映射关系
host_id__ap_map: Dict[int, models.AccessPoint]
# AgentStep 适配器
agent_step_adapter: AgentStepAdapter
# 注入AP_ID
injected_ap_id: int


class AgentBaseService(BaseService, metaclass=abc.ABCMeta):
"""
AGENT安装基类
Expand Down Expand Up @@ -305,17 +317,13 @@ def maintain_agent_proc_status_uniqueness(self, bk_host_ids: Set[int]) -> None:
proc_statuses_to_be_created.append(models.ProcessStatus(bk_host_id=host_id, **self.agent_proc_common_data))
models.ProcessStatus.objects.bulk_create(proc_statuses_to_be_created, batch_size=self.batch_size)


@dataclass
class AgentCommonData(CommonData):
# 默认接入点
default_ap: models.AccessPoint
# 主机ID - 接入点 映射关系
host_id__ap_map: Dict[int, models.AccessPoint]
# AgentStep 适配器
agent_step_adapter: AgentStepAdapter
# 注入AP_ID
injected_ap_id: int
def get_host_ap(self, common_data: AgentCommonData, host: models.Host) -> models.AccessPoint:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 这个封装优雅

# 优先使用注入的AP ID
if common_data.injected_ap_id:
host_ap: models.AccessPoint = common_data.ap_id_obj_map[common_data.injected_ap_id]
else:
host_ap: models.AccessPoint = common_data.host_id__ap_map[host.bk_host_id]
return host_ap


class RetryHandler:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def get_script_content(self, data, common_data: AgentCommonData, host: models.Ho
path_handler: PathHandler = PathHandler(host.os_type)
ctl_exe_name: str = ("gse_agent", "gse_agent.exe")[host.os_type == constants.OsType.WINDOWS]
general_node_type: str = self.get_general_node_type(host.node_type)
setup_path: str = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)["setup_path"]
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
setup_path: str = host_ap.get_agent_config(host.os_type)["setup_path"]
agent_path: str = path_handler.join(setup_path, general_node_type, "bin", ctl_exe_name)

return f"{agent_path} --version"
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@

class PushUpgradeFileService(AgentTransferFileService):
def get_file_target_path(self, data, common_data: AgentCommonData, host: models.Host) -> str:
return common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)["temp_path"]
host_ap = self.get_host_ap(common_data=common_data, host=host)
return host_ap.get_agent_config(host.os_type)["temp_path"]

def get_upgrade_package_source_path(self, common_data: AgentCommonData, host: models.Host) -> Tuple[str, str]:
"""
获取升级包源路径
"""
host_ap = self.get_host_ap(common_data=common_data, host=host)
# 1.x 升级到 1.x,使用老到路径,升级包直接放在 download 目录下
agent_path = root_path = common_data.host_id__ap_map[host.bk_host_id].nginx_path or settings.DOWNLOAD_PATH
agent_path = root_path = host_ap.nginx_path or settings.DOWNLOAD_PATH
if not common_data.agent_step_adapter.is_legacy:
# 2.x 升级到 2.x,根据操作系统、CPU 架构等组合路径
agent_path = os.path.join(root_path, "agent", host.os_type.lower(), host.cpu_arch.lower())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def get_script_content(self, data, common_data: AgentCommonData, host: models.Ho
# 路径处理器
path_handler = PathHandler(host.os_type)
general_node_type = self.get_general_node_type(host.node_type)
setup_path = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)["setup_path"]
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
setup_path = host_ap.get_agent_config(host.os_type)["setup_path"]
agent_path = path_handler.join(setup_path, general_node_type, "bin")
if common_data.agent_step_adapter.is_legacy:
return f"cd {agent_path} && ./gse_agent --reload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ class RenderAndPushGseConfigService(AgentPushConfigService):
def get_config_info_list(self, data, common_data: AgentCommonData, host: models.Host) -> List[Dict[str, Any]]:
file_name = common_data.agent_step_adapter.get_main_config_filename()
general_node_type = self.get_general_node_type(host.node_type)
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
content = common_data.agent_step_adapter.get_config(
host=host, filename=file_name, node_type=general_node_type, ap=common_data.host_id__ap_map[host.bk_host_id]
host=host, filename=file_name, node_type=general_node_type, ap=host_ap
)
return [{"file_name": file_name, "content": content}]

def get_file_target_path(self, data, common_data: AgentCommonData, host: models.Host) -> str:
general_node_type = self.get_general_node_type(host.node_type)
path_handler = PathHandler(host.os_type)
setup_path = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)["setup_path"]
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
setup_path = host_ap.get_agent_config(host.os_type)["setup_path"]
return path_handler.join(setup_path, general_node_type, "etc")
3 changes: 2 additions & 1 deletion apps/backend/components/collections/agent_new/restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def get_script_content(self, data, common_data: AgentCommonData, host: models.Ho
ctl_exe_name = ("gsectl", "gsectl.bat")[host.os_type == constants.OsType.WINDOWS]
cmd_suffix = ("restart >/dev/null 2>&1", "restart")[host.os_type == constants.OsType.WINDOWS]
general_node_type = self.get_general_node_type(host.node_type)
setup_path = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)["setup_path"]
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
setup_path = host_ap.get_agent_config(host.os_type)["setup_path"]
agent_path = path_handler.join(setup_path, general_node_type, "bin", ctl_exe_name)

return f"{agent_path} {cmd_suffix}"
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ def script_name(self):
def get_script_content(self, data, common_data: AgentCommonData, host: models.Host) -> str:
agent_upgrade_pkg_name = self.get_agent_pkg_name(common_data, host, is_upgrade=True)
general_node_type = self.get_general_node_type(host.node_type)
agent_config = common_data.host_id__ap_map[host.bk_host_id].get_agent_config(host.os_type)
host_ap: models.AccessPoint = self.get_host_ap(common_data=common_data, host=host)
agent_config = host_ap.get_agent_config(host.os_type)

if host.os_type == constants.OsType.WINDOWS:
scripts = WINDOWS_UPGRADE_CMD_TEMPLATE.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def _execute(self, data, parent_data, common_data: AgentCommonData):
added_extra_data = filter_values(added_extra_data)
# 同名配置覆盖优先级:added_extra_data(新增配置)> host_obj.extra_data(已有配置)> default_extra_data(默认配置)
host_obj.extra_data = dict(ChainMap(added_extra_data, host_obj.extra_data, default_extra_data))
host_obj.ap_id = host_info["ap_id"]
if common_data.injected_ap_id is None:
host_obj.ap_id = host_info["ap_id"]
hosts_to_be_updated.append(host_obj)
models.Host.objects.bulk_update(hosts_to_be_updated, fields=["extra_data", "ap_id"], batch_size=self.batch_size)
11 changes: 10 additions & 1 deletion apps/backend/subscription/steps/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,23 @@ def _generate_activities(self, agent_manager):
def install_other_agent_codes(self):

return [
# 安装或重装code
"query_password",
"bind_host_agent",
"upgrade_to_agent_id",
"push_agent_pkg_to_proxy",
"install",
"get_agent_status",
"check_policy_gse_to_proxy",
"configure_policy",
# 重载配置相关code
"agent_check_agent_status",
"update_install_info",
"render_and_push_gse_config",
"reload_agent_config",
"wait",
"check_agent_ability",
# 共用code
"get_agent_status",
]

def generate_activities(
Expand Down
13 changes: 8 additions & 5 deletions apps/backend/subscription/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
)
from apps.backend.utils.data_renderer import nested_render_data
from apps.component.esbclient import client_v2
from apps.core.ipchooser.tools.base import HostQuerySqlHelper
from apps.exceptions import ComponentCallError
from apps.node_man import constants, models
from apps.node_man import tools as node_man_tools
from apps.utils.basic import chunk_lists, distinct_dict_list, order_dict
from apps.utils.batch_request import batch_request, request_multi_thread
from apps.utils.cache import func_cache_decorator
from apps.utils.time_handler import strftime_local
from apps.core.ipchooser.tools.base import HostQuerySqlHelper

logger = logging.getLogger("app")

Expand Down Expand Up @@ -564,11 +564,13 @@ def get_host_detail(host_info_list: list, bk_biz_id: int = None):
for host_info in host_info_list:
if "bk_host_id" in host_info:
if host_info["bk_host_id"] in host_id_dict:
host_details.append(host_id_dict[host_info["bk_host_id"]])
host_info.update(host_id_dict[host_info["bk_host_id"]])
host_details.append(host_info)
else:
host_key = create_host_key(host_info)
if host_key in host_key_dict:
host_details.append(host_key_dict[host_key])
host_info.update(host_key_dict[host_key])
host_details.append(host_info)

return host_details

Expand Down Expand Up @@ -681,7 +683,7 @@ def wrapper(scope: Dict[str, Union[Dict, Any]], *args, **kwargs) -> Dict[str, Di
"object_type": scope["object_type"],
"node_type": scope["node_type"],
"nodes": list(nodes),
"instance_selector": scope.get("instance_selector")
"instance_selector": scope.get("instance_selector"),
},
**kwargs,
}
Expand Down Expand Up @@ -806,6 +808,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,

if not need_register:
# 补充必要的主机或实例相关信息

add_host_info_to_instances(bk_biz_id, scope, instances)
add_scope_info_to_instances(nodes, scope, instances, module_to_topo)
add_process_info_to_instances(bk_biz_id, scope, instances)
Expand All @@ -831,7 +834,7 @@ def get_instances_by_scope(scope: Dict[str, Union[Dict, int, Any]]) -> Dict[str,
instance_selector_host_ids = HostQuerySqlHelper.multiple_cond_sql(
params={"bk_host_id": bk_host_ids, "conditions": instance_selector},
biz_scope=[bk_biz_id],
return_all_node_type=True
return_all_node_type=True,
).values_list("bk_host_id", flat=True)

selector_instances_dict = {}
Expand Down
10 changes: 10 additions & 0 deletions apps/mock_data/views_mkd/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,13 @@


JOB_OPERATE_REQUEST_PARAMS = {"job_type": constants.JobType.REINSTALL_AGENT, "bk_host_id": [host.DEFAULT_HOST_ID]}
JOB_OPERATE_REQUEST_PARAMS_WITHOUT_HOST_ID = {
"job_type": constants.JobType.REINSTALL_AGENT,
"hosts": [
{
"bk_cloud_id": constants.DEFAULT_CLOUD,
"ap_id": constants.DEFAULT_AP_ID,
"inner_ip": host.DEFAULT_IP,
}
],
}
16 changes: 6 additions & 10 deletions apps/node_man/handlers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

from django.conf import settings
from django.core.paginator import Paginator
from django.db.models import Q
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import get_language
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -187,9 +187,7 @@ def list(self, params: dict, username: str):
biz_scope_query_q = Q()
else:
biz_scope_query_q = reduce(
operator.or_,
[Q(bk_biz_scope__contains=bk_biz_id) for bk_biz_id in biz_scope],
Q()
operator.or_, [Q(bk_biz_scope__contains=bk_biz_id) for bk_biz_id in biz_scope], Q()
)
# 仅查询所有业务时,自身创建的 job 可见
if not search_biz_ids:
Expand Down Expand Up @@ -616,15 +614,13 @@ def update_host(accept_list: list, ip_filter_list: list, is_manual: bool = False

return update_data_info["subscription_host_ids"], ip_filter_list

def operate(self, job_type, bk_host_ids, bk_biz_scope, extra_params, extra_config):
def operate(self, job_type, hosts, bk_biz_scope, extra_params, extra_config):
"""
用于只有bk_host_id参数的下线、重启等操作
"""
# 校验器进行校验

subscription = self.create_subscription(
job_type, bk_host_ids, extra_params=extra_params, extra_config=extra_config
)
subscription = self.create_subscription(job_type, hosts, extra_params=extra_params, extra_config=extra_config)

return tools.JobTools.create_job(
job_type=job_type,
Expand All @@ -634,9 +630,9 @@ def operate(self, job_type, bk_host_ids, bk_biz_scope, extra_params, extra_confi
statistics={
"success_count": 0,
"failed_count": 0,
"pending_count": len(bk_host_ids),
"pending_count": len(hosts),
"running_count": 0,
"total_count": len(bk_host_ids),
"total_count": len(hosts),
},
)

Expand Down
4 changes: 2 additions & 2 deletions apps/node_man/handlers/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,14 @@ def operate(params: dict, username: str):
).values("bk_host_id", "bk_biz_id", "bk_cloud_id", "inner_ip", "node_type", "os_type")

# 校验器进行校验
db_host_ids, host_biz_scope = operate_validator(list(db_host_sql))
db_hosts, host_biz_scope = operate_validator(list(db_host_sql))

plugin_name__job_id__map = {}
for plugin_params in params["plugin_params_list"]:
plugin_name, plugin_version = plugin_params["name"], plugin_params["version"]
subscription_create_result = PluginHandler.create_subscription(
job_type=params["job_type"],
nodes=db_host_ids,
nodes=db_hosts,
name=plugin_name,
version=plugin_version,
keep_config=plugin_params.get("keep_config"),
Expand Down
16 changes: 10 additions & 6 deletions apps/node_man/handlers/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ def bulk_update_validate(
for host in accept_list:
# 系统变更/接入点变更/DHT变更需要更新主机
host_extra_data = host_info[host["bk_host_id"]]["extra_data"] or {}
if host.get("is_need_inject_ap_id"):
host["ap_id"] = host_info[host["bk_host_id"]]["ap_id"]

if (
host.get("os_type") != host_info[host["bk_host_id"]]["os_type"]
or host.get("ap_id") != host_info[host["bk_host_id"]]["ap_id"]
or (host.get("ap_id") != host_info[host["bk_host_id"]]["ap_id"] and not host.get("is_need_inject_ap_id"))
or host.get("bt_speed_limit") != host_extra_data.get("bt_speed_limit")
or host.get("peer_exchange_switch_for_agent") != host_extra_data.get("peer_exchange_switch_for_agent")
or host.get("enable_compression") != host_extra_data.get("enable_compression")
Expand All @@ -183,6 +181,7 @@ def bulk_update_validate(
{
"bk_host_id": host["bk_host_id"],
"bk_cloud_id": host["bk_cloud_id"],
"ap_id": host["ap_id"],
"install_channel_id": host.get("install_channel_id"),
"is_need_inject_ap_id": host.get("is_need_inject_ap_id"),
}
Expand Down Expand Up @@ -567,7 +566,7 @@ def install_validate(
return ip_filter_list, accept_list, proxy_not_alive


def operate_validator(db_host_sql):
def operate_validator(db_host_sql, host_info: typing.Dict[int, typing.Dict[str, typing.Any]] = {}):
"""
用于operate任务的校验
:param db_host_sql: 用户操作主机的详细信息
Expand Down Expand Up @@ -595,6 +594,11 @@ def operate_validator(db_host_sql):
# 获得业务ID
host_biz_scope = list({host["bk_biz_id"] for host in db_host_sql})

db_host_ids = [{"bk_host_id": host_id} for host_id in permission_host_ids]
db_hosts: typing.List[typing.Dict[str, typing.Any]] = []

return db_host_ids, host_biz_scope
for host_id in permission_host_ids:
_host = {"bk_host_id": host_id}
_host.update(host_info.get(host_id, {}))
db_hosts.append(_host)

return db_hosts, host_biz_scope
Loading
Loading