Skip to content

Commit

Permalink
feature: 作业平台文件分发支持制品库(#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Sep 2, 2021
1 parent e6afa56 commit 5238b37
Show file tree
Hide file tree
Showing 33 changed files with 1,038 additions and 181 deletions.
55 changes: 16 additions & 39 deletions apps/backend/api/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from blueapps.utils.esbclient import get_client_by_user
from django.conf import settings

from apps.core.files.storage import get_storage

from .constants import (
ACCOUNT_MAP,
POLLING_INTERVAL,
Expand Down Expand Up @@ -286,44 +288,19 @@ def fast_push_file(
:param timeout: 超时时间
:return: Job任务id
"""
params = {
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"task_name": task_name,
"timeout": timeout,
"file_target_path": file_target_path,
"file_source": file_source,
"ip_list": ip_list,
"account": self.account,
"bk_username": self.username,
}
response = self.client.job.fast_push_file(params)
data = self._response_exception_filter("client.job.fast_push_file", params, response)

return data["job_instance_id"]

file_source_list = []
for file_source_item in file_source:
file_source_list.append({"file_list": file_source_item.get("files", [])})

class DummyJobClient(JobClient):
"""
dummy job client
"""

TASK_RESULT = {
"success": [{"ip": "127.0.0.1", "bk_cloud_id": 0, "log_content": "dummy-job-log-content"}],
"pending": [],
"failed": [],
}

def fast_push_file(self, ip_list, file_target_path, file_source):
return "dummy-job-instance-id"

def push_config_file(self, ip_list, file_target_path, file_list):
return "dummy-job-instance-id"

def fast_execute_script(self, ip_list, script_content, script_param, script_timeout=3000):
return "dummy-job-instance-id"

def get_task_result(self, job_instance_id):
return True, self.TASK_RESULT


# JobClient = DummyJobClient
# TODO 重定向到JOB V3,待后续Agent优化改造时统一使用JOBV3
storage = get_storage()
return storage.fast_transfer_file(
bk_biz_id=settings.BLUEKING_BIZ_ID,
task_name=task_name,
timeout=timeout,
account_alias=self.account,
file_target_path=file_target_path,
file_source_list=file_source_list,
target_server={"ip_list": ip_list},
)
8 changes: 1 addition & 7 deletions apps/backend/components/collections/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -925,13 +925,7 @@ def execute(self, data, parent_data):
# windows机器需要添加解压文件
if os_type == "windows":
files.extend(["7z.dll", "7z.exe"])
file_source = [
{
"files": [f"{nginx_path}/{file}" for file in files],
"account": "root",
"ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}],
}
]
file_source = [{"files": [f"{nginx_path}/{file}" for file in files]}]

data.inputs.file_source = file_source

Expand Down
8 changes: 1 addition & 7 deletions apps/backend/components/collections/bulk_job_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,7 @@ def execute(self, data, parent_data):
# windows机器需要添加解压文件
if os_type == "windows":
files.extend(["7z.dll", "7z.exe"])
file_source = [
{
"files": [f"{nginx_path}/{file}" for file in files],
"account": "root",
"ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}],
}
]
file_source = [{"files": [f"{nginx_path}/{file}" for file in files]}]

# 增加ip字段
host_info["ip"] = host_info["bk_host_innerip"]
Expand Down
26 changes: 9 additions & 17 deletions apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from apps.backend.api.constants import POLLING_INTERVAL, POLLING_TIMEOUT
from apps.backend.api.job import JobClient, process_parms
from apps.backend.components.collections.base import BaseService
from apps.core.files.storage import get_storage
from apps.exceptions import AppBaseException
from apps.node_man import constants
from apps.node_man.models import (
Expand Down Expand Up @@ -167,17 +168,17 @@ def request_single_job_and_create_map(
"account_alias": account_alias,
}
)
for index, file_source in enumerate(job_params.get("file_source_list", [])):
# 给文件源填充调用用户
job_params["file_source_list"][index]["account"] = {"alias": account_alias}

if "timeout" not in job_params:
# 设置默认超时时间
job_params["timeout"] = 300

if isinstance(subscription_instance_id, int):
subscription_instance_id = [subscription_instance_id]

try:
storage = get_storage()
job_params = storage.process_query_params(job_func, job_params)
# 请求作业平台
job_instance_id = job_func(job_params)["job_instance_id"]
except AppBaseException as err:
Expand All @@ -191,13 +192,10 @@ def request_single_job_and_create_map(
# 组装调用作业平台的日志
file_target_path = job_params.get("file_target_path")
if job_func == JobApi.fast_transfer_file:
source_ips = []
source_files = []
for file_source in job_params.get("file_source_list", []):
for server in file_source["server"]["ip_list"]:
source_ips.append(f"{server['bk_cloud_id']}-{server['ip']}")
source_files.extend(file_source["file_list"])
log = f"从 [{','.join(source_ips)}] 下发文件 [{','.join(source_files)}] 到目标机器路径 [{file_target_path}]"
log = storage.gen_transfer_file_log(
file_target_path=file_target_path, file_source_list=job_params.get("file_source_list")
)
# 节点管理仅使用 push_config_file 下发 content,不涉及从文件源读取文件
elif job_func == JobApi.push_config_file:
file_names = ",".join([file["file_name"] for file in job_params.get("file_list", [])])
log = f"下发配置文件 [{file_names}] 到目标机器路径 [{file_target_path}],若下发失败,请检查作业平台所部署的机器是否已安装AGENT"
Expand Down Expand Up @@ -571,13 +569,7 @@ def execute(self, data, parent_data):
self.logger.error(_("上传至Proxy的文件列表为空,请联系节点管理管理员配置接入点信息"))
return False

data.inputs.file_source = [
{
"files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files],
"account": "root",
"ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}],
}
]
data.inputs.file_source = [{"files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files]}]

return super(PushFileToProxyService, self).execute(data, parent_data)

Expand Down
11 changes: 2 additions & 9 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from apps.node_man import constants, exceptions, models
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.utils.batch_request import request_multi_thread
from apps.utils.path_handler import PathHandler
from apps.utils.files import PathHandler
from common.api import GseApi, JobApi
from pipeline.component_framework.component import Component
from pipeline.core.flow import Service, StaticIntervalGenerator
Expand Down Expand Up @@ -415,14 +415,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
"subscription_id": common_data.subscription.id,
"job_params": {
"file_target_path": job["temp_path"],
"file_source_list": [
{
"file_list": file_list,
"server": {
"ip_list": [{"bk_cloud_id": settings.DEFAULT_CLOUD_ID, "ip": settings.BKAPP_NFS_IP}]
},
}
],
"file_source_list": [{"file_list": file_list}],
"os_type": job["os_type"],
"target_server": {"ip_list": job["ip_list"]},
},
Expand Down
3 changes: 2 additions & 1 deletion apps/backend/plugin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from apps.backend.subscription.handler import SubscriptionHandler
from apps.backend.subscription.tasks import run_subscription_task_and_create_instance
from apps.core.files import core_files_constants
from apps.core.files.storage import get_storage
from apps.exceptions import AppBaseException, ValidationError
from apps.generic import APIViewSet
Expand Down Expand Up @@ -767,7 +768,7 @@ def query_export_task(self, request):
download_url = ""
else:
# TODO: 此处后续需要提供一个统一的 storage.tmp_url(name) 方法,用于插件包的临时下载
if settings.STORAGE_TYPE in const.COS_TYPES:
if settings.STORAGE_TYPE in core_files_constants.StorageType.list_cos_member_values():
download_url = get_storage().url(record.file_path)
else:
download_url = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_API, record.download_params])
Expand Down
5 changes: 4 additions & 1 deletion apps/backend/tests/api/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,13 @@ def test_fast_execute_script(self):
assert task_result["pending"] == []
assert task_result["failed"] == []

@mock.patch(
"apps.core.files.base.JobApi.fast_transfer_file",
mock.MagicMock(return_value=FAST_PUSH_FILE_JOB_RESPONSE["data"]),
)
def test_fast_push_file(self):
job = JobClient(**self.JOB_INIT)
job.client.job.execute_platform_job = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_RESPONSE)
job.client.job.fast_push_file = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_RESPONSE)
job.client.job.get_job_instance_log = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_INSTANCE_LOG)

job_instance_id = job.fast_push_file(**self.FAST_PUSH_FILE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from django.test import TestCase
from mock import patch

from apps.backend.components.collections.agent import (
PushUpgradePackageComponent,
PushUpgradePackageService,
)
from apps.backend.tests.components.collections.agent import utils
from apps.backend.tests.components.collections.job import utils as job_utils
from apps.node_man import constants, models
from apps.utils.unittest.testcase import CustomBaseTestCase
from pipeline.component_framework.test import (
ComponentTestCase,
ComponentTestMixin,
Expand Down Expand Up @@ -45,14 +46,7 @@
"context": "test",
# 由执行业务逻辑填充,在此只是展示数据结构
"file_target_path": "/tmp/test",
"file_source": [
{
"files": ["/tmp/REGEX:[a-z]*.txt"],
"account": "root",
"ip_list": [{"bk_cloud_id": constants.DEFAULT_CLOUD, "ip": utils.TEST_IP}],
"custom_query_id": ["3"],
}
],
"file_source": [{"files": ["/tmp/REGEX:[a-z]*.txt"]}],
}
)

Expand All @@ -66,7 +60,7 @@ class PushUpgradePackageTestComponent(PushUpgradePackageComponent):
bound_service = PushUpgradePackageTestService


class PushUpgradePackageSuccessTest(TestCase, ComponentTestMixin):
class PushUpgradePackageSuccessTest(CustomBaseTestCase, ComponentTestMixin):
JOB_MOCK_CLIENT = utils.JobMockClient(
fast_push_file_return=utils.JOB_INSTANCE_ID_METHOD_RETURN,
get_job_instance_log_return=utils.JOB_GET_INSTANCE_LOG_RETURN,
Expand All @@ -81,8 +75,7 @@ def setUp(self):
self.job_version = patch(utils.JOB_VERSION_MOCK_PATH, "V3")
self.job_version.start()

def tearDown(self):
self.job_version.stop()
patch(job_utils.CORE_FILES_JOB_API_PATH, job_utils.JobV3MockApi()).start()

def component_cls(self):
return PushUpgradePackageTestComponent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import uuid
from copy import deepcopy

from django.test import TestCase
from mock import patch

from apps.backend.api.constants import OS, JobDataStatus, JobIPStatus
Expand All @@ -22,6 +21,7 @@
from apps.backend.tests.components.collections.agent import utils as agent_utils
from apps.backend.tests.components.collections.job import utils
from apps.node_man import constants, models
from apps.utils.unittest.testcase import CustomBaseTestCase
from pipeline.component_framework.test import (
ComponentTestCase,
ComponentTestMixin,
Expand Down Expand Up @@ -96,7 +96,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return True


class JobBulkPushFileComponentTestCase(TestCase, ComponentTestMixin):
class JobBulkPushFileComponentTestCase(CustomBaseTestCase, ComponentTestMixin):

PUSH_FILE_RECORD_ID = 1

Expand All @@ -122,13 +122,7 @@ class JobBulkPushFileComponentTestCase(TestCase, ComponentTestMixin):
"job_client": JOB_CLIENT,
"host_info": {"ip": "127.0.0.3", "bk_supplier_id": 0, "bk_cloud_id": 0},
"file_target_path": "/data/",
"file_source": [
{
"files": ["/data/dev_pipeline_unit_test"],
"account": "root",
"ip_list": [{"bk_cloud_id": 0, "ip": "127.0.0.1", "bk_supplier_id": 0}],
}
],
"file_source": [{"files": ["/data/dev_pipeline_unit_test"]}],
}

POLLING_TIMEOUT_MOCK_PATH = "apps.backend.components.collections.bulk_job.TRIGGER_THRESHOLD"
Expand All @@ -140,6 +134,7 @@ def setUp(self):
fast_push_file_return=utils.JOB_EXECUTE_TASK_RETURN,
get_job_instance_log_return=JOB_GET_INSTANCE_LOG_RETURN,
)
patch(utils.CORE_FILES_JOB_API_PATH, utils.JobV3MockApi()).start()
patch(utils.JOB_VERSION_MOCK_PATH, "V3").start()
# 设置小阈值,直接触发批量分发
patch(self.POLLING_TIMEOUT_MOCK_PATH, 2).start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def setUp(self):
patch(utils.JOB_VERSION_MOCK_PATH, "V3").start()
# 设置小阈值,直接触发批量分发
patch(self.POLLING_TIMEOUT_MOCK_PATH, 2).start()
patch(utils.CORE_FILES_JOB_API_PATH, utils.JobV3MockApi()).start()

self.task_id = test_bulk_push_file.push_file_record_params["task_id"]
self.hash_ip_status_key = f"{self.REDIS_CACHE_PREFIX}:task_id:{self.task_id}:ip:status"
Expand Down
43 changes: 27 additions & 16 deletions apps/backend/tests/components/collections/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from typing import Dict

from mock import MagicMock

Expand All @@ -20,26 +21,12 @@ def api_success_return(data):

TASK_ID = 10000


class JobMockClient(object):
def __init__(
self,
push_config_file_return=None,
get_job_instance_log_return=None,
fast_execute_script_return=None,
fast_push_file_return=None,
):
self.job = MagicMock()
self.job.push_config_file = MagicMock(return_value=push_config_file_return)
self.job.get_job_instance_log = MagicMock(return_value=get_job_instance_log_return)
self.job.fast_execute_script = MagicMock(return_value=fast_execute_script_return)
self.job.fast_push_file = MagicMock(return_value=fast_push_file_return)


JOB_CLIENT_MOCK_PATH = "apps.backend.api.job.get_client_by_user"

JOB_VERSION_MOCK_PATH = "apps.backend.api.job.settings.JOB_VERSION"

CORE_FILES_JOB_API_PATH = "apps.core.files.base.JobApi"

JOB_EXECUTE_TASK_RETURN = {
"result": True,
"code": 0,
Expand Down Expand Up @@ -79,3 +66,27 @@ def __init__(
}
],
}


class JobMockClient(object):
def __init__(
self,
push_config_file_return=None,
get_job_instance_log_return=None,
fast_execute_script_return=None,
fast_push_file_return=None,
):
self.job = MagicMock()
self.job.push_config_file = MagicMock(return_value=push_config_file_return)
self.job.get_job_instance_log = MagicMock(return_value=get_job_instance_log_return)
self.job.fast_execute_script = MagicMock(return_value=fast_execute_script_return)
self.job.fast_push_file = MagicMock(return_value=fast_push_file_return)


# JobApi Mock
# 和 JobMockClient 的区别:1. 获取接口方式-client.job.api / JobApi.api 2.JobApi 仅返回 data
class JobV3MockApi:
fast_transfer_file_return = JOB_EXECUTE_TASK_RETURN["data"]

def __init__(self, fast_transfer_file_return: Dict = None):
self.fast_transfer_file = MagicMock(return_value=fast_transfer_file_return or self.fast_transfer_file_return)
Loading

0 comments on commit 5238b37

Please sign in to comment.