Skip to content

Commit

Permalink
feature: JOB API 适配 (closed #785)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW authored and ZhuoZhuoCrayon committed Jul 12, 2022
1 parent 21cac04 commit ea16267
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 83 deletions.
4 changes: 3 additions & 1 deletion apps/backend/agent/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ def arg_parser() -> argparse.ArgumentParser:
"account": "root",
"is_param_sensitive": 1,
"script_type": 4,
"target_server": {"ip_list": [{"bk_cloud_id": proxy.bk_cloud_id, "ip": proxy.inner_ip}]},
"target_server": {
"ip_list": [{"bk_cloud_id": proxy.bk_cloud_id, "ip": proxy.inner_ip, "host_id": proxy.bk_host_id}]
},
}
data = client_v2.job.fast_execute_script(kwargs)
job_instance_id = data["job_instance_id"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ def script_name(self):
def get_target_servers(self, data, common_data: CommonData, host: models.Host) -> List[Dict[str, Any]]:
# 取接入点
ap = common_data.ap_id_obj_map[host.ap_id]
return [{"bk_cloud_id": constants.DEFAULT_CLOUD, "ip": bt_server["inner_ip"]} for bt_server in ap.btfileserver]
return [
{
"bk_cloud_id": constants.DEFAULT_CLOUD,
"ip": bt_server["inner_ip"],
"bk_host_id": bt_server.get("bk_host_id", ""),
}
for bt_server in ap.btfileserver
]

def get_script_content(self, data, common_data: AgentCommonData, host: models.Host) -> str:
port_config = common_data.host_id__ap_map[host.bk_host_id].port_config
Expand Down
16 changes: 14 additions & 2 deletions apps/backend/components/collections/agent_new/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import socket
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -386,10 +386,22 @@ def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools

# 使用全业务执行作业
bk_biz_id = settings.BLUEKING_BIZ_ID
target_server: Dict[str, List[Union[int, Dict[str, Union[int, str]]]]] = (
{"host_id_list": [jump_server.bk_host_id]}
if settings.ENABLE_DBCP
else {
"ip_list": [
{
"ip": jump_server.inner_ip,
"bk_cloud_id": jump_server.bk_cloud_id,
}
]
}
)
kwargs = {
"bk_biz_id": bk_biz_id,
"task_name": f"NODEMAN_{sub_inst_id}_{self.__class__.__name__}",
"target_server": {"ip_list": [{"ip": jump_server.inner_ip, "bk_cloud_id": jump_server.bk_cloud_id}]},
"target_server": target_server,
"timeout": constants.JOB_TIMEOUT,
"account_alias": settings.BACKEND_UNIX_ACCOUNT,
"script_language": constants.ScriptLanguageType.PYTHON.value,
Expand Down
44 changes: 35 additions & 9 deletions apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ def request_single_job_and_create_map(
self, job_func, subscription_instance_id: [int, list], subscription_id, job_params: dict
):
"""请求作业平台并创建与订阅实例的映射"""
if not job_params["target_server"]["ip_list"]:
# 没有IP列表时,不发起请求
return []
host_interaction_from = ("ip_list", "host_id_list")[settings.BKAPP_ENABLE_DHCP]
host_interaction_data_list = job_params["target_server"].get(host_interaction_from, [])

if not host_interaction_data_list:
return []
job_params["target_server"] = {host_interaction_from: host_interaction_data_list}
# 补充作业平台通用参数
if not job_params.get("os_type"):
job_params["os_type"] = self.DEFAULT_OS_TYPE
Expand Down Expand Up @@ -204,7 +206,7 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) ->
# 构造主机作业状态映射表
cloud_ip_status_map: Dict[str, Dict] = {}
for ip_result in ip_results["step_instance_list"][0].get("step_ip_result_list") or []:
cloud_ip_status_map[f'{ip_result["bk_cloud_id"]}-{ip_result["ip"]}'] = ip_result
cloud_ip_status_map[f'{ip_result["bk_cloud_id"]}-{ip_result["ip"]}-{ip_result["bk_host_id"]}'] = ip_result

succeed_sub_inst_ids: List[int] = []
subscription_instances = models.SubscriptionInstanceRecord.objects.filter(
Expand All @@ -213,7 +215,7 @@ def handler_job_result(self, job_sub_map: models.JobSubscriptionInstanceMap) ->

bk_host_ids = [sub_inst.instance_info["host"]["bk_host_id"] for sub_inst in subscription_instances]
host_id__cloud_ip_map = {
host_info["bk_host_id"]: f"{host_info['bk_cloud_id']}-{host_info['inner_ip']}"
host_info["bk_host_id"]: f"{host_info['bk_cloud_id']}-{host_info['inner_ip']}-{host_info['bk_host_id']}"
for host_info in models.Host.objects.filter(bk_host_id__in=bk_host_ids).values(
"bk_host_id", "inner_ip", "bk_cloud_id"
)
Expand Down Expand Up @@ -392,13 +394,17 @@ def _execute(self, data, parent_data, common_data: CommonData):
if md5_key in multi_job_params_map:
multi_job_params_map[md5_key]["subscription_instance_id"].append(sub_inst.id)
multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].extend(target_servers)
multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].append(
{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}
)
multi_job_params_map[md5_key]["job_params"]["target_server"]["host_id_list"].append(host_obj.bk_host_id)
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_execute_script,
"subscription_instance_id": [sub_inst.id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": {"ip_list": target_servers},
"target_server": {"ip_list": target_servers, "host_id_list": host_obj.bk_host_id},
"script_content": script_content,
"script_param": script_param,
"timeout": timeout,
Expand Down Expand Up @@ -436,7 +442,7 @@ def get_target_servers(self, data, common_data: CommonData, host: models.Host) -
:param host: 主机类型
:return: 目标服务器
"""
return [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}]
return [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip, "bk_host_id": host.bk_host_id}]


class JobTransferFileService(JobV3BaseService, metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -464,13 +470,22 @@ def _execute(self, data, parent_data, common_data: CommonData):
multi_job_params_map[md5_key]["job_params"]["target_server"]["ip_list"].append(
{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}
)
multi_job_params_map[md5_key]["job_params"]["target_server"]["host_id_list"].append(host_obj.bk_host_id)
else:
multi_job_params_map[md5_key] = {
"job_func": JobApi.fast_transfer_file,
"subscription_instance_id": [sub_inst.id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": {"ip_list": [{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}]},
"target_server": {
"ip_list": [
{
"bk_cloud_id": host_obj.bk_cloud_id,
"ip": host_obj.inner_ip,
}
],
"host_id_list": [host_obj.bk_host_id],
},
"file_target_path": file_target_path,
"file_source_list": [{"file_list": file_list}],
"timeout": timeout,
Expand Down Expand Up @@ -538,6 +553,9 @@ def _execute(self, data, parent_data, common_data: CommonData):
multi_job_params_map[job_unique_key]["job_params"]["target_server"]["ip_list"].append(
{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}
)
multi_job_params_map[job_unique_key]["job_params"]["target_server"]["host_id_list"].append(
host_obj.bk_host_id
)
else:
file_source_list = []
for config_info in config_info_list:
Expand All @@ -549,7 +567,15 @@ def _execute(self, data, parent_data, common_data: CommonData):
"subscription_instance_id": [sub_inst.id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": {"ip_list": [{"bk_cloud_id": host_obj.bk_cloud_id, "ip": host_obj.inner_ip}]},
"target_server": {
"ip_list": [
{
"bk_cloud_id": host_obj.bk_cloud_id,
"ip": host_obj.inner_ip,
}
],
"host_id_list": [host_obj.bk_host_id],
},
"file_target_path": file_target_path,
"file_list": file_source_list,
"timeout": timeout,
Expand Down
60 changes: 39 additions & 21 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import os
from collections import defaultdict
from functools import reduce
from typing import Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.db import IntegrityError
Expand Down Expand Up @@ -535,6 +535,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
os_type = host.os_type.lower() or constants.OsType.LINUX.lower()
package_path = "/".join((nginx_path, os_type, host.cpu_arch, package.pkg_name))
jobs[package_path]["ip_list"].append({"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip})
jobs[package_path]["host_id_list"].append(host.bk_host_id)
jobs[package_path]["subscription_instance_ids"].append(subscription_instance.id)
jobs[package_path]["temp_path"] = agent_config["temp_path"]
jobs[package_path]["os_type"] = host.os_type
Expand All @@ -544,7 +545,6 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
for package_path, job in jobs.items():
file_list = [package_path]
file_list = self.append_extra_files(job["os_type"], file_list, nginx_path)

multi_job_params.append(
{
"job_func": JobApi.fast_transfer_file,
Expand All @@ -554,7 +554,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"file_target_path": job["temp_path"],
"file_source_list": [{"file_list": file_list}],
"os_type": job["os_type"],
"target_server": {"ip_list": job["ip_list"]},
"target_server": {"ip_list": job["ip_list"], "host_id_list": job["host_id_list"]},
},
}
)
Expand Down Expand Up @@ -614,7 +614,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
group_id_instance_map = common_data.group_id_instance_map

# 批量请求作业平台的参数
multi_job_params_map = {}
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
for process_status in process_statuses:
if self.need_skipped(process_status, common_data):
continue
Expand All @@ -639,13 +639,17 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
multi_job_params_map[key]["job_params"]["target_server"]["ip_list"].append(
{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}
)
multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append(host.bk_host_id)
else:
multi_job_params_map[key] = {
"job_func": JobApi.fast_execute_script,
"subscription_instance_id": [subscription_instance_id],
"subscription_id": common_data.subscription.id,
"job_params": {
"target_server": {"ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}]},
"target_server": {
"ip_list": [{"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}],
"host_id_list": [host.bk_host_id],
},
"script_content": script_content,
"script_param": script_param,
"timeout": timeout,
Expand Down Expand Up @@ -816,17 +820,19 @@ def allocate_port_to_process_status(
bk_host_id = process_status.bk_host_id

# 查询并解析该主机已被占用的端口号
result = JobApi.get_job_instance_ip_log(
{
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"job_instance_id": job_instance_id,
"step_instance_id": step_instance_id,
"bk_cloud_id": host.bk_cloud_id,
"ip": host.inner_ip,
}
instance_log_base_params: Dict[str, Union[str, int]] = {
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"job_instance_id": job_instance_id,
"step_instance_id": step_instance_id,
}
host_interaction_params: Dict[str, Union[str, int]] = (
{"bk_host_id": host.bk_host_id}
if settings.BKAPP_ENABLE_DHCP
else {"bk_cloud_id": host.bk_cloud_id, "ip": host.inner_ip}
)
result = JobApi.get_job_instance_ip_log({**instance_log_base_params, **host_interaction_params})
used_ports = self.parse_used_port(result.get("log_content", ""))
port_range_list = models.ProcControl.parse_port_range(plugin_control.port_range)
queryset = models.ProcessStatus.objects.filter(bk_host_id=bk_host_id)
Expand Down Expand Up @@ -931,7 +937,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
subscription_step = models.SubscriptionStep.objects.get(id=subscription_step_id)

# 组装调用作业平台的参数
multi_job_params_map = {}
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
for process_status in process_statuses:
target_bk_host_id = process_status.bk_host_id
subscription_instance = group_id_instance_map.get(process_status.group_id)
Expand Down Expand Up @@ -967,6 +973,9 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
multi_job_params_map[key]["job_params"]["target_server"]["ip_list"].append(
{"bk_cloud_id": target_host.bk_cloud_id, "ip": target_host.inner_ip}
)
multi_job_params_map[key]["job_params"]["target_server"]["host_id_list"].append(
target_host.bk_host_id
)
else:
multi_job_params_map[key] = {
"job_func": JobApi.push_config_file,
Expand All @@ -975,7 +984,13 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"job_params": {
"os_type": target_host.os_type,
"target_server": {
"ip_list": [{"bk_cloud_id": target_host.bk_cloud_id, "ip": target_host.inner_ip}]
"ip_list": [
{
"bk_cloud_id": target_host.bk_cloud_id,
"ip": target_host.inner_ip,
}
],
"host_id_list": [target_host.bk_host_id],
},
"file_target_path": file_target_path,
"file_list": [{"file_name": file_name, "content": process_parms(file_content)}],
Expand Down Expand Up @@ -1331,20 +1346,23 @@ def _schedule(self, data, parent_data, callback_data=None):
)
# 调试插件时仅有一个IP,以下取值方式与作业平台API文档一致,不会抛出 IndexError/KeyError 的异常
step_instance_id = result["step_instance_list"][0]["step_instance_id"]
bk_host_id = result["step_instance_list"][0]["step_ip_result_list"][0]["bk_host_id"]
ip = result["step_instance_list"][0]["step_ip_result_list"][0]["ip"]
bk_cloud_id = result["step_instance_list"][0]["step_ip_result_list"][0]["bk_cloud_id"]

params = {
instance_log_base_params: Dict[str, Union[str, int]] = {
"job_instance_id": job_instance_id,
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"bk_scope_type": constants.BkJobScopeType.BIZ_SET.value,
"bk_scope_id": settings.BLUEKING_BIZ_ID,
"bk_username": settings.BACKEND_JOB_OPERATOR,
"step_instance_id": step_instance_id,
"ip": ip,
"bk_cloud_id": bk_cloud_id,
"bk_host_id": bk_host_id,
}
task_result = JobApi.get_job_instance_ip_log(params)
host_interaction_params: Dict[str, Union[str, int]] = (
{"bk_host_id": bk_host_id} if settings.BKAPP_ENABLE_DHCP else {"ip": ip, "bk_cloud_id": bk_cloud_id}
)
task_result = JobApi.get_job_instance_ip_log({**instance_log_base_params, **host_interaction_params})

# 只写入新的日志,保证轮询过程中不会重复写入job的日志
last_logs = data.get_one_of_outputs("last_logs", "")
Expand Down
3 changes: 3 additions & 0 deletions apps/backend/tests/api/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ class TestJob(TestCase):
{
"total_time": 0.001,
"ip": "127.0.0.1",
"host_id": 1,
"start_time": "2019-08-12 19:04:07 +0800",
"log_content": "\u30102019-08-12 19:04:07.757\u3011 FileName\uff1a/data/dev_"
"pipeline_unit_test FileSize\uff1a9.0 Byte State\uff1adownload"
Expand Down Expand Up @@ -212,6 +213,7 @@ def test_fast_execute_script(self):
assert len(task_result["success"]) == 1
assert set(task_result["success"][0].keys()) == {
"ip",
"host_id",
"log_content",
"bk_cloud_id",
"error_code",
Expand All @@ -236,6 +238,7 @@ def test_fast_push_file(self):
assert len(task_result["success"]) == 1
assert set(task_result["success"][0].keys()) == {
"ip",
"host_id",
"log_content",
"bk_cloud_id",
"error_code",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def structure_mock_data(cls):
info_to_be_updated = {
"ip": host_obj.inner_ip,
"bk_cloud_id": host_obj.bk_cloud_id,
"bk_host_id": host_obj.bk_host_id,
"status": constants.BkJobStatus.SUCCEEDED,
}
step_ip_result_list.append(dict(ChainMap(info_to_be_updated, step_ip_result)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import base64
from typing import Dict

from django.test import override_settings
from django.utils.translation import ugettext_lazy as _

from apps.backend.components.collections.agent_new import (
Expand All @@ -31,6 +32,7 @@ def get_default_case_name(cls) -> str:
def component_cls(self):
return components.CheckPolicyGseToProxyComponent

@override_settings(BKAPP_ENABLE_DHCP=False)
def tearDown(self) -> None:
record = self.job_api_mock_client.call_recorder.record
host_key__script_content_map: Dict[str, str] = {}
Expand Down
Loading

0 comments on commit ea16267

Please sign in to comment.