From 5238b3728d80327497270b49fbbaf2382afbdf3e Mon Sep 17 00:00:00 2001 From: crayon <873217631@qq.com> Date: Wed, 25 Aug 2021 16:10:02 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E4=BD=9C=E4=B8=9A=E5=B9=B3=E5=8F=B0?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=88=86=E5=8F=91=E6=94=AF=E6=8C=81=E5=88=B6?= =?UTF-8?q?=E5=93=81=E5=BA=93(#2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/api/job.py | 55 ++-- apps/backend/components/collections/agent.py | 8 +- .../components/collections/bulk_job_redis.py | 8 +- apps/backend/components/collections/job.py | 26 +- apps/backend/components/collections/plugin.py | 11 +- apps/backend/plugin/views.py | 3 +- apps/backend/tests/api/test_job.py | 5 +- .../agent/test_push_upgrade_package.py | 17 +- .../bulk_job/test_bulk_push_file.py | 13 +- .../bulk_job/test_job_bulk_push_file_v2.py | 1 + .../tests/components/collections/job/utils.py | 43 +-- .../plugin/test_transfer_package.py | 33 +-- .../path_handler.py => core/exceptions.py} | 16 +- apps/core/files/__init__.py | 5 + apps/core/files/admin.py | 26 ++ apps/core/files/apps.py | 16 ++ apps/core/files/base.py | 170 ++++++++++++ apps/core/files/constants.py | 90 +++++++ apps/core/files/exceptions.py | 38 +++ apps/core/files/file_source.py | 245 ++++++++++++++++++ apps/core/files/migrations/0001_initial.py | 53 ++++ apps/core/files/migrations/__init__.py | 10 + apps/core/files/models.py | 63 +++++ apps/core/files/storage.py | 103 +++++++- apps/node_man/constants.py | 1 - apps/node_man/handlers/plugin_v2.py | 3 +- .../management/commands/sync_plugin_status.py | 4 +- apps/utils/basic.py | 4 +- apps/utils/cache.py | 30 ++- apps/utils/enum.py | 66 +++++ apps/utils/files.py | 30 ++- common/api/modules/job.py | 12 + config/default.py | 11 +- 33 files changed, 1038 insertions(+), 181 deletions(-) rename apps/{utils/path_handler.py => core/exceptions.py} (65%) create mode 100644 apps/core/files/admin.py create mode 100644 apps/core/files/apps.py create mode 100644 apps/core/files/constants.py create mode 100644 apps/core/files/exceptions.py create mode 100644 apps/core/files/file_source.py create mode 100644 apps/core/files/migrations/0001_initial.py create mode 100644 apps/core/files/migrations/__init__.py create mode 100644 apps/core/files/models.py create mode 100644 apps/utils/enum.py diff --git a/apps/backend/api/job.py b/apps/backend/api/job.py index 77f060a1b..5b0d5fe4f 100644 --- a/apps/backend/api/job.py +++ b/apps/backend/api/job.py @@ -18,6 +18,8 @@ from blueapps.utils.esbclient import get_client_by_user from django.conf import settings +from apps.core.files.storage import get_storage + from .constants import ( ACCOUNT_MAP, POLLING_INTERVAL, @@ -286,44 +288,19 @@ def fast_push_file( :param timeout: 超时时间 :return: Job任务id """ - params = { - "bk_biz_id": settings.BLUEKING_BIZ_ID, - "task_name": task_name, - "timeout": timeout, - "file_target_path": file_target_path, - "file_source": file_source, - "ip_list": ip_list, - "account": self.account, - "bk_username": self.username, - } - response = self.client.job.fast_push_file(params) - data = self._response_exception_filter("client.job.fast_push_file", params, response) - - return data["job_instance_id"] + file_source_list = [] + for file_source_item in file_source: + file_source_list.append({"file_list": file_source_item.get("files", [])}) -class DummyJobClient(JobClient): - """ - dummy job client - """ - - TASK_RESULT = { - "success": [{"ip": "127.0.0.1", "bk_cloud_id": 0, "log_content": "dummy-job-log-content"}], - "pending": [], - "failed": [], - } - - def fast_push_file(self, ip_list, file_target_path, file_source): - return "dummy-job-instance-id" - - def push_config_file(self, ip_list, file_target_path, file_list): - return "dummy-job-instance-id" - - def fast_execute_script(self, ip_list, script_content, script_param, script_timeout=3000): - return "dummy-job-instance-id" - - def get_task_result(self, job_instance_id): - return True, self.TASK_RESULT - - -# JobClient = DummyJobClient + # TODO 重定向到JOB V3,待后续Agent优化改造时统一使用JOBV3 + storage = get_storage() + return storage.fast_transfer_file( + bk_biz_id=settings.BLUEKING_BIZ_ID, + task_name=task_name, + timeout=timeout, + account_alias=self.account, + file_target_path=file_target_path, + file_source_list=file_source_list, + target_server={"ip_list": ip_list}, + ) diff --git a/apps/backend/components/collections/agent.py b/apps/backend/components/collections/agent.py index a76743ee5..295263d75 100644 --- a/apps/backend/components/collections/agent.py +++ b/apps/backend/components/collections/agent.py @@ -925,13 +925,7 @@ def execute(self, data, parent_data): # windows机器需要添加解压文件 if os_type == "windows": files.extend(["7z.dll", "7z.exe"]) - file_source = [ - { - "files": [f"{nginx_path}/{file}" for file in files], - "account": "root", - "ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}], - } - ] + file_source = [{"files": [f"{nginx_path}/{file}" for file in files]}] data.inputs.file_source = file_source diff --git a/apps/backend/components/collections/bulk_job_redis.py b/apps/backend/components/collections/bulk_job_redis.py index 9013e9550..87c801bd6 100644 --- a/apps/backend/components/collections/bulk_job_redis.py +++ b/apps/backend/components/collections/bulk_job_redis.py @@ -427,13 +427,7 @@ def execute(self, data, parent_data): # windows机器需要添加解压文件 if os_type == "windows": files.extend(["7z.dll", "7z.exe"]) - file_source = [ - { - "files": [f"{nginx_path}/{file}" for file in files], - "account": "root", - "ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}], - } - ] + file_source = [{"files": [f"{nginx_path}/{file}" for file in files]}] # 增加ip字段 host_info["ip"] = host_info["bk_host_innerip"] diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index 6bf3e1df6..aadeea21d 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -23,6 +23,7 @@ from apps.backend.api.constants import POLLING_INTERVAL, POLLING_TIMEOUT from apps.backend.api.job import JobClient, process_parms from apps.backend.components.collections.base import BaseService +from apps.core.files.storage import get_storage from apps.exceptions import AppBaseException from apps.node_man import constants from apps.node_man.models import ( @@ -167,9 +168,6 @@ def request_single_job_and_create_map( "account_alias": account_alias, } ) - for index, file_source in enumerate(job_params.get("file_source_list", [])): - # 给文件源填充调用用户 - job_params["file_source_list"][index]["account"] = {"alias": account_alias} if "timeout" not in job_params: # 设置默认超时时间 @@ -177,7 +175,10 @@ def request_single_job_and_create_map( if isinstance(subscription_instance_id, int): subscription_instance_id = [subscription_instance_id] + try: + storage = get_storage() + job_params = storage.process_query_params(job_func, job_params) # 请求作业平台 job_instance_id = job_func(job_params)["job_instance_id"] except AppBaseException as err: @@ -191,13 +192,10 @@ def request_single_job_and_create_map( # 组装调用作业平台的日志 file_target_path = job_params.get("file_target_path") if job_func == JobApi.fast_transfer_file: - source_ips = [] - source_files = [] - for file_source in job_params.get("file_source_list", []): - for server in file_source["server"]["ip_list"]: - source_ips.append(f"{server['bk_cloud_id']}-{server['ip']}") - source_files.extend(file_source["file_list"]) - log = f"从 [{','.join(source_ips)}] 下发文件 [{','.join(source_files)}] 到目标机器路径 [{file_target_path}]" + log = storage.gen_transfer_file_log( + file_target_path=file_target_path, file_source_list=job_params.get("file_source_list") + ) + # 节点管理仅使用 push_config_file 下发 content,不涉及从文件源读取文件 elif job_func == JobApi.push_config_file: file_names = ",".join([file["file_name"] for file in job_params.get("file_list", [])]) log = f"下发配置文件 [{file_names}] 到目标机器路径 [{file_target_path}],若下发失败,请检查作业平台所部署的机器是否已安装AGENT" @@ -571,13 +569,7 @@ def execute(self, data, parent_data): self.logger.error(_("上传至Proxy的文件列表为空,请联系节点管理管理员配置接入点信息")) return False - data.inputs.file_source = [ - { - "files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files], - "account": "root", - "ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}], - } - ] + data.inputs.file_source = [{"files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files]}] return super(PushFileToProxyService, self).execute(data, parent_data) diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index e3aaf63c2..86b13191b 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -39,7 +39,7 @@ from apps.node_man import constants, exceptions, models from apps.node_man.handlers.cmdb import CmdbHandler from apps.utils.batch_request import request_multi_thread -from apps.utils.path_handler import PathHandler +from apps.utils.files import PathHandler from common.api import GseApi, JobApi from pipeline.component_framework.component import Component from pipeline.core.flow import Service, StaticIntervalGenerator @@ -415,14 +415,7 @@ def _execute(self, data, parent_data, common_data: CommonData): "subscription_id": common_data.subscription.id, "job_params": { "file_target_path": job["temp_path"], - "file_source_list": [ - { - "file_list": file_list, - "server": { - "ip_list": [{"bk_cloud_id": settings.DEFAULT_CLOUD_ID, "ip": settings.BKAPP_NFS_IP}] - }, - } - ], + "file_source_list": [{"file_list": file_list}], "os_type": job["os_type"], "target_server": {"ip_list": job["ip_list"]}, }, diff --git a/apps/backend/plugin/views.py b/apps/backend/plugin/views.py index b710614db..5af553a15 100644 --- a/apps/backend/plugin/views.py +++ b/apps/backend/plugin/views.py @@ -47,6 +47,7 @@ ) from apps.backend.subscription.handler import SubscriptionHandler from apps.backend.subscription.tasks import run_subscription_task_and_create_instance +from apps.core.files import core_files_constants from apps.core.files.storage import get_storage from apps.exceptions import AppBaseException, ValidationError from apps.generic import APIViewSet @@ -767,7 +768,7 @@ def query_export_task(self, request): download_url = "" else: # TODO: 此处后续需要提供一个统一的 storage.tmp_url(name) 方法,用于插件包的临时下载 - if settings.STORAGE_TYPE in const.COS_TYPES: + if settings.STORAGE_TYPE in core_files_constants.StorageType.list_cos_member_values(): download_url = get_storage().url(record.file_path) else: download_url = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_API, record.download_params]) diff --git a/apps/backend/tests/api/test_job.py b/apps/backend/tests/api/test_job.py index e7ebd279d..25df0845d 100644 --- a/apps/backend/tests/api/test_job.py +++ b/apps/backend/tests/api/test_job.py @@ -220,10 +220,13 @@ def test_fast_execute_script(self): assert task_result["pending"] == [] assert task_result["failed"] == [] + @mock.patch( + "apps.core.files.base.JobApi.fast_transfer_file", + mock.MagicMock(return_value=FAST_PUSH_FILE_JOB_RESPONSE["data"]), + ) def test_fast_push_file(self): job = JobClient(**self.JOB_INIT) job.client.job.execute_platform_job = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_RESPONSE) - job.client.job.fast_push_file = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_RESPONSE) job.client.job.get_job_instance_log = mock.MagicMock(return_value=self.FAST_PUSH_FILE_JOB_INSTANCE_LOG) job_instance_id = job.fast_push_file(**self.FAST_PUSH_FILE) diff --git a/apps/backend/tests/components/collections/agent/test_push_upgrade_package.py b/apps/backend/tests/components/collections/agent/test_push_upgrade_package.py index 760d96250..8d8d98a49 100644 --- a/apps/backend/tests/components/collections/agent/test_push_upgrade_package.py +++ b/apps/backend/tests/components/collections/agent/test_push_upgrade_package.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from django.test import TestCase from mock import patch from apps.backend.components.collections.agent import ( @@ -16,7 +15,9 @@ PushUpgradePackageService, ) from apps.backend.tests.components.collections.agent import utils +from apps.backend.tests.components.collections.job import utils as job_utils from apps.node_man import constants, models +from apps.utils.unittest.testcase import CustomBaseTestCase from pipeline.component_framework.test import ( ComponentTestCase, ComponentTestMixin, @@ -45,14 +46,7 @@ "context": "test", # 由执行业务逻辑填充,在此只是展示数据结构 "file_target_path": "/tmp/test", - "file_source": [ - { - "files": ["/tmp/REGEX:[a-z]*.txt"], - "account": "root", - "ip_list": [{"bk_cloud_id": constants.DEFAULT_CLOUD, "ip": utils.TEST_IP}], - "custom_query_id": ["3"], - } - ], + "file_source": [{"files": ["/tmp/REGEX:[a-z]*.txt"]}], } ) @@ -66,7 +60,7 @@ class PushUpgradePackageTestComponent(PushUpgradePackageComponent): bound_service = PushUpgradePackageTestService -class PushUpgradePackageSuccessTest(TestCase, ComponentTestMixin): +class PushUpgradePackageSuccessTest(CustomBaseTestCase, ComponentTestMixin): JOB_MOCK_CLIENT = utils.JobMockClient( fast_push_file_return=utils.JOB_INSTANCE_ID_METHOD_RETURN, get_job_instance_log_return=utils.JOB_GET_INSTANCE_LOG_RETURN, @@ -81,8 +75,7 @@ def setUp(self): self.job_version = patch(utils.JOB_VERSION_MOCK_PATH, "V3") self.job_version.start() - def tearDown(self): - self.job_version.stop() + patch(job_utils.CORE_FILES_JOB_API_PATH, job_utils.JobV3MockApi()).start() def component_cls(self): return PushUpgradePackageTestComponent diff --git a/apps/backend/tests/components/collections/bulk_job/test_bulk_push_file.py b/apps/backend/tests/components/collections/bulk_job/test_bulk_push_file.py index 38e48706f..06e8b6bc6 100644 --- a/apps/backend/tests/components/collections/bulk_job/test_bulk_push_file.py +++ b/apps/backend/tests/components/collections/bulk_job/test_bulk_push_file.py @@ -11,7 +11,6 @@ import uuid from copy import deepcopy -from django.test import TestCase from mock import patch from apps.backend.api.constants import OS, JobDataStatus, JobIPStatus @@ -22,6 +21,7 @@ from apps.backend.tests.components.collections.agent import utils as agent_utils from apps.backend.tests.components.collections.job import utils from apps.node_man import constants, models +from apps.utils.unittest.testcase import CustomBaseTestCase from pipeline.component_framework.test import ( ComponentTestCase, ComponentTestMixin, @@ -96,7 +96,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): return True -class JobBulkPushFileComponentTestCase(TestCase, ComponentTestMixin): +class JobBulkPushFileComponentTestCase(CustomBaseTestCase, ComponentTestMixin): PUSH_FILE_RECORD_ID = 1 @@ -122,13 +122,7 @@ class JobBulkPushFileComponentTestCase(TestCase, ComponentTestMixin): "job_client": JOB_CLIENT, "host_info": {"ip": "127.0.0.3", "bk_supplier_id": 0, "bk_cloud_id": 0}, "file_target_path": "/data/", - "file_source": [ - { - "files": ["/data/dev_pipeline_unit_test"], - "account": "root", - "ip_list": [{"bk_cloud_id": 0, "ip": "127.0.0.1", "bk_supplier_id": 0}], - } - ], + "file_source": [{"files": ["/data/dev_pipeline_unit_test"]}], } POLLING_TIMEOUT_MOCK_PATH = "apps.backend.components.collections.bulk_job.TRIGGER_THRESHOLD" @@ -140,6 +134,7 @@ def setUp(self): fast_push_file_return=utils.JOB_EXECUTE_TASK_RETURN, get_job_instance_log_return=JOB_GET_INSTANCE_LOG_RETURN, ) + patch(utils.CORE_FILES_JOB_API_PATH, utils.JobV3MockApi()).start() patch(utils.JOB_VERSION_MOCK_PATH, "V3").start() # 设置小阈值,直接触发批量分发 patch(self.POLLING_TIMEOUT_MOCK_PATH, 2).start() diff --git a/apps/backend/tests/components/collections/bulk_job/test_job_bulk_push_file_v2.py b/apps/backend/tests/components/collections/bulk_job/test_job_bulk_push_file_v2.py index 2189ec8a8..d0632d2c9 100644 --- a/apps/backend/tests/components/collections/bulk_job/test_job_bulk_push_file_v2.py +++ b/apps/backend/tests/components/collections/bulk_job/test_job_bulk_push_file_v2.py @@ -47,6 +47,7 @@ def setUp(self): patch(utils.JOB_VERSION_MOCK_PATH, "V3").start() # 设置小阈值,直接触发批量分发 patch(self.POLLING_TIMEOUT_MOCK_PATH, 2).start() + patch(utils.CORE_FILES_JOB_API_PATH, utils.JobV3MockApi()).start() self.task_id = test_bulk_push_file.push_file_record_params["task_id"] self.hash_ip_status_key = f"{self.REDIS_CACHE_PREFIX}:task_id:{self.task_id}:ip:status" diff --git a/apps/backend/tests/components/collections/job/utils.py b/apps/backend/tests/components/collections/job/utils.py index 45856088a..4e4ae4abc 100644 --- a/apps/backend/tests/components/collections/job/utils.py +++ b/apps/backend/tests/components/collections/job/utils.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +from typing import Dict from mock import MagicMock @@ -20,26 +21,12 @@ def api_success_return(data): TASK_ID = 10000 - -class JobMockClient(object): - def __init__( - self, - push_config_file_return=None, - get_job_instance_log_return=None, - fast_execute_script_return=None, - fast_push_file_return=None, - ): - self.job = MagicMock() - self.job.push_config_file = MagicMock(return_value=push_config_file_return) - self.job.get_job_instance_log = MagicMock(return_value=get_job_instance_log_return) - self.job.fast_execute_script = MagicMock(return_value=fast_execute_script_return) - self.job.fast_push_file = MagicMock(return_value=fast_push_file_return) - - JOB_CLIENT_MOCK_PATH = "apps.backend.api.job.get_client_by_user" JOB_VERSION_MOCK_PATH = "apps.backend.api.job.settings.JOB_VERSION" +CORE_FILES_JOB_API_PATH = "apps.core.files.base.JobApi" + JOB_EXECUTE_TASK_RETURN = { "result": True, "code": 0, @@ -79,3 +66,27 @@ def __init__( } ], } + + +class JobMockClient(object): + def __init__( + self, + push_config_file_return=None, + get_job_instance_log_return=None, + fast_execute_script_return=None, + fast_push_file_return=None, + ): + self.job = MagicMock() + self.job.push_config_file = MagicMock(return_value=push_config_file_return) + self.job.get_job_instance_log = MagicMock(return_value=get_job_instance_log_return) + self.job.fast_execute_script = MagicMock(return_value=fast_execute_script_return) + self.job.fast_push_file = MagicMock(return_value=fast_push_file_return) + + +# JobApi Mock +# 和 JobMockClient 的区别:1. 获取接口方式-client.job.api / JobApi.api 2.JobApi 仅返回 data +class JobV3MockApi: + fast_transfer_file_return = JOB_EXECUTE_TASK_RETURN["data"] + + def __init__(self, fast_transfer_file_return: Dict = None): + self.fast_transfer_file = MagicMock(return_value=fast_transfer_file_return or self.fast_transfer_file_return) diff --git a/apps/backend/tests/components/collections/plugin/test_transfer_package.py b/apps/backend/tests/components/collections/plugin/test_transfer_package.py index bb1cb56f9..a62a1bc6f 100644 --- a/apps/backend/tests/components/collections/plugin/test_transfer_package.py +++ b/apps/backend/tests/components/collections/plugin/test_transfer_package.py @@ -10,10 +10,10 @@ """ from unittest.mock import patch -from django.test import TestCase - from apps.backend.components.collections.plugin import TransferPackageComponent +from apps.backend.tests.components.collections.job import utils as job_utils from apps.backend.tests.components.collections.plugin import utils +from apps.utils.unittest.testcase import CustomBaseTestCase from pipeline.component_framework.test import ( ComponentTestCase, ComponentTestMixin, @@ -22,7 +22,7 @@ ) -class TransferPackageTest(TestCase, ComponentTestMixin): +class TransferPackageTest(CustomBaseTestCase, ComponentTestMixin): def setUp(self): self.ids = utils.PluginTestObjFactory.init_db() self.COMMON_INPUTS = utils.PluginTestObjFactory.inputs( @@ -35,25 +35,14 @@ def setUp(self): # 主机信息保持和默认一致 instance_info_attr_values={}, ) - self.cmdb_client = patch(utils.CMDB_CLIENT_MOCK_PATH, utils.CmdbClient) - self.plugin_client = patch(utils.PLUGIN_CLIENT_MOCK_PATH, utils.JobMockClient) - self.plugin_multi_thread = patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.job_jobapi = patch(utils.JOB_JOBAPI, utils.JobMockClient) - self.job_multi_thread = patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client) - self.plugin_gseapi = patch(utils.PLUGIN_GSEAPI, utils.GseMockClient) - - self.cmdb_client.start() - self.plugin_client.start() - self.plugin_multi_thread.start() - self.job_jobapi.start() - self.job_multi_thread.start() + patch(utils.CMDB_CLIENT_MOCK_PATH, utils.CmdbClient).start() + patch(utils.PLUGIN_MULTI_THREAD_PATH, utils.request_multi_thread_client).start() + patch(utils.JOB_MULTI_THREAD_PATH, utils.request_multi_thread_client).start() + patch(utils.PLUGIN_GSEAPI, utils.GseMockClient).start() - def tearDown(self): - self.cmdb_client.stop() - self.plugin_client.stop() - self.plugin_multi_thread.stop() - self.job_jobapi.stop() - self.job_multi_thread.stop() + patch(utils.JOB_JOBAPI, utils.JobMockClient).start() + patch(utils.PLUGIN_CLIENT_MOCK_PATH, utils.JobMockClient).start() + patch(job_utils.CORE_FILES_JOB_API_PATH, utils.JobMockClient).start() def component_cls(self): return TransferPackageComponent @@ -61,7 +50,7 @@ def component_cls(self): def cases(self): return [ ComponentTestCase( - name="测试文件分发", + name="测试插件包分发", inputs=self.COMMON_INPUTS, parent_data={}, execute_assertion=ExecuteAssertion( diff --git a/apps/utils/path_handler.py b/apps/core/exceptions.py similarity index 65% rename from apps/utils/path_handler.py rename to apps/core/exceptions.py index 557ced515..e6302483d 100644 --- a/apps/utils/path_handler.py +++ b/apps/core/exceptions.py @@ -8,19 +8,9 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import ntpath -import posixpath -from apps.node_man import constants +from apps.exceptions import AppBaseException -class PathHandler(object): - def __init__(self, os_type): - self.os_type = os_type.lower() - if os_type == constants.OsType.WINDOWS: - self.path_handler = ntpath - else: - self.path_handler = posixpath - - def __getattr__(self, item): - return getattr(self.path_handler, item) +class CoreBaseException(AppBaseException): + MODULE_CODE = 3000 diff --git a/apps/core/files/__init__.py b/apps/core/files/__init__.py index b402ee3b4..a65678703 100644 --- a/apps/core/files/__init__.py +++ b/apps/core/files/__init__.py @@ -8,3 +8,8 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ + + +from . import constants + +core_files_constants = constants diff --git a/apps/core/files/admin.py b/apps/core/files/admin.py new file mode 100644 index 000000000..08e99e91d --- /dev/null +++ b/apps/core/files/admin.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.contrib import admin + +from . import models + + +@admin.register(models.BKJobFileCredential) +class BKJobFileCredentialAdmin(admin.ModelAdmin): + list_display = ["id", "bk_biz_id", "storage_type", "name", "type", "description", "credential_id"] + search_fields = ["id", "bk_biz_id", "storage_type", "name", "type", "credential_id"] + + +@admin.register(models.BKJobFileSource) +class BKJobFileSourceAdmin(admin.ModelAdmin): + list_display = ["credential_id", "file_source_id", "code"] + search_fields = ["credential_id", "file_source_id", "code"] diff --git a/apps/core/files/apps.py b/apps/core/files/apps.py new file mode 100644 index 000000000..1da8f237d --- /dev/null +++ b/apps/core/files/apps.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.apps import AppConfig + + +class CoreFilesConfig(AppConfig): + name = "apps.core.files" diff --git a/apps/core/files/base.py b/apps/core/files/base.py index 60c7f294e..49dddc3c9 100644 --- a/apps/core/files/base.py +++ b/apps/core/files/base.py @@ -9,10 +9,19 @@ specific language governing permissions and limitations under the License. """ import os +from abc import ABC, abstractmethod +from copy import deepcopy +from typing import Any, Callable, Dict, List, Union from django.conf import settings from django.core.exceptions import SuspiciousFileOperation +from django.core.files.storage import Storage from django.utils.crypto import get_random_string +from django.utils.translation import ugettext_lazy as _ + +from common.api import JobApi + +from . import constants, exceptions, models class StorageFileOverwriteMixin: @@ -50,3 +59,164 @@ def _gen_random_name(_file_root) -> str: ) name = name if self.file_overwrite else _gen_random_name(file_root) return name + + +class BkJobMixin: + @abstractmethod + def _handle_file_source_list( + self, file_source_list: List[Dict[str, Any]], extra_transfer_file_params: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """ + 预处理源文件列表,添加文件源等信息 + :param file_source_list: 源文件 + :param extra_transfer_file_params: transfer_files 的其他参数 + :return: 源文件对象列表 + """ + raise NotImplementedError + + @classmethod + def gen_transfer_file_log(cls, file_target_path: str, file_source_list: List[Dict[str, Any]]) -> str: + """ + 生成文件分发日志 + :param file_target_path: 文件分发目标路径 + :param file_source_list: 源文件列表 + :return: 日志字符串 + """ + + third_part_file_source_ids: List[str] = [ + file_source["file_source_id"] + for file_source in file_source_list + if file_source.get("file_type") == constants.JobFileType.THIRD_PART.value + ] + + # 获取第三方文件源信息,判空避免无效IO + if third_part_file_source_ids: + third_part_file_source_infos = models.BKJobFileSource.objects.filter( + file_source_id__in=third_part_file_source_ids + ).values("file_source_id", "alias") + file_source_id__info_map = { + third_part_file_source_info["file_source_id"]: third_part_file_source_info + for third_part_file_source_info in third_part_file_source_infos + } + else: + file_source_id__info_map = {} + + # 源文件分发日志 + files_transfer_log_list: List[str] = [] + # 遍历源文件分发列表,一般来说该列表长度为1 + for file_source in file_source_list: + file_type = file_source.get("file_type", constants.JobFileType.SERVER.value) + file_type_alias = constants.JobFileType.get_member_value__alias_map().get(file_type, _("未知文件源")) + + if file_type == constants.JobFileType.THIRD_PART.value: + source_info_str = _("{file_type_alias}-{file_source_alias}").format( + file_type_alias=file_type_alias, + file_source_alias=file_source_id__info_map[file_source["file_source_id"]]["alias"], + ) + elif file_type == constants.JobFileType.SERVER.value: + server_ip_str_set = { + f"{ip_info['bk_cloud_id']}-{ip_info['ip']}" for ip_info in file_source["server"]["ip_list"] + } + source_info_str = _("{file_type_alias}-{server_ips_str}").format( + file_type_alias=file_type_alias, server_ips_str=",".join(server_ip_str_set) + ) + else: + source_info_str = _("未知文件源-{file_type}").format(file_type=file_type) + + files_transfer_log_list.append( + _("从 [{source_info_str}] 下发文件 [{file_list_str}] 到目标机器路径 [{file_target_path}]").format( + source_info_str=source_info_str, + file_list_str=",".join(file_source.get("file_list")), + file_target_path=file_target_path, + ) + ) + + return "\n".join(files_transfer_log_list) + + def process_query_params( + self, job_api_func: Callable[[Dict[str, Any]], Dict[str, Any]], query_params: Dict[str, Any] + ) -> Dict[str, Any]: + """ + 预处理请求参数 + :param job_api_func: JobApi method + :param query_params: 请求参数 + :return: 预处理后的参数 + """ + # 如果后续预处理的job api增多,考虑拆分逻辑 + if job_api_func == JobApi.fast_transfer_file: + query_params = deepcopy(query_params) + query_params["file_source_list"] = self._handle_file_source_list( + file_source_list=query_params.get("file_source_list", []), extra_transfer_file_params=query_params + ) + return query_params + + def fast_transfer_file( + self, + bk_biz_id: int, + task_name: str, + timeout: int, + account_alias: str, + file_target_path: str, + file_source_list: List[Dict[str, Any]], + target_server: Dict[str, List[Dict[str, Union[str, int]]]], + **kwargs, + ) -> str: + """ + 分发文件 + :param bk_biz_id: 业务ID + :param task_name: 任务名称 + :param timeout: 超时时间 + :param account_alias: 目标执行账号别名 + :param file_target_path: 文件目标路径 + :param file_source_list: 源文件路径列表 + :param kwargs: 额外的调用参数 + :param target_server: 目标服务器 + 可选: + 1. ip_list - 静态IP列表 + bk_cloud_id - 云区域ID + ip - IP地址 + 2. dynamic_group_list - 动态分组ID列表 + 3. topo_node_list - 动态topo节点列表 + id - 动态topo节点ID,对应CMDB API 中的 bk_inst_id + node_type - 动态topo节点类型,对应CMDB API 中的 bk_obj_id,比如"module","set" + target_server 示例 + { + "ip_list": [ + {"bk_cloud_id": 0, "ip": "127.0.0.1"}, + {"bk_cloud_id": 0, "ip": "127.0.0.2"} + ] + } + :return: 作业实例ID + """ + + base_transfer_file_params = deepcopy( + { + "bk_biz_id": bk_biz_id, + "task_name": task_name, + "timeout": timeout, + "account_alias": account_alias, + "file_target_path": file_target_path, + "target_server": target_server, + **kwargs, + } + ) + + # 根据文件源对文件路径列表进行处理,并返回源文件对象数组 + file_source_list = self._handle_file_source_list( + file_source_list=deepcopy(file_source_list), extra_transfer_file_params=base_transfer_file_params + ) + + try: + job_instance_id = JobApi.fast_transfer_file( + {"file_source_list": file_source_list, **base_transfer_file_params} + )["job_instance_id"] + + except Exception as err: + # 捕获异常并抛出当前层级的异常 + raise exceptions.FilesTransferError(_("文件分发错误:err_msg -> {err_msg}").format(err_msg=err)) + + return job_instance_id + + +class BaseStorage(StorageFileOverwriteMixin, BkJobMixin, Storage, ABC): + storage_type: str = None diff --git a/apps/core/files/constants.py b/apps/core/files/constants.py new file mode 100644 index 000000000..93182b0ce --- /dev/null +++ b/apps/core/files/constants.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from enum import Enum +from typing import Dict, List + +from django.utils.translation import ugettext_lazy as _ + +from apps.utils.cache import class_member_cache +from apps.utils.enum import EnhanceEnum +from config.default import StorageType as DefaultStorageType + + +class CosBucketEnum(EnhanceEnum): + """对象存储仓库枚举""" + + PUBLIC = "public" + PRIVATE = "private" + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return {cls.PRIVATE: _("私有仓库"), cls.PUBLIC: _("公共仓库")} + + +class JobFileType(EnhanceEnum): + """作业平台源文件类型""" + + SERVER = 1 + THIRD_PART = 3 + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return {cls.SERVER: _("服务器文件"), cls.THIRD_PART: _("第三方文件源文件")} + + +class StorageType(EnhanceEnum): + """文件存储类型枚举 + 为了避免循环引用,在default中维护DefaultStorageType + 不允许继承已有枚举成员的Enum,故直接赋值,注意更改DefaultStorageType需要同步 + 参考:https://stackoverflow.com/questions/33679930/how-to-extend-python-enum + """ + + BLUEKING_ARTIFACTORY = DefaultStorageType.BLUEKING_ARTIFACTORY.value + FILE_SYSTEM = DefaultStorageType.FILE_SYSTEM.value + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return {cls.BLUEKING_ARTIFACTORY: _("蓝鲸制品库"), cls.FILE_SYSTEM: _("本地文件系统")} + + @classmethod + @class_member_cache() + def list_cos_member_values(cls) -> List[str]: + """列举属于对象存储类型""" + return [cls.BLUEKING_ARTIFACTORY.value] + + @classmethod + @class_member_cache() + def get_member_value__job_file_type_map(cls) -> Dict[str, int]: + """ + 获取文件存储类型 - JOB源文件类型映射 + """ + return { + cls.BLUEKING_ARTIFACTORY.value: JobFileType.THIRD_PART.value, + cls.FILE_SYSTEM.value: JobFileType.SERVER.value, + } + + +class FileCredentialType(EnhanceEnum): + """文件凭证类型""" + + SECRET_KEY = "SECRET_KEY" + ACCESS_KEY_SECRET_KEY = "ACCESS_KEY_SECRET_KEY" + PASSWORD = "PASSWORD" + USERNAME_PASSWORD = "USERNAME_PASSWORD" + + @classmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + return { + cls.SECRET_KEY: cls.SECRET_KEY.value, + cls.ACCESS_KEY_SECRET_KEY: cls.ACCESS_KEY_SECRET_KEY.value, + cls.PASSWORD: cls.PASSWORD.value, + cls.USERNAME_PASSWORD: cls.USERNAME_PASSWORD.value, + } diff --git a/apps/core/files/exceptions.py b/apps/core/files/exceptions.py new file mode 100644 index 000000000..f5f288a34 --- /dev/null +++ b/apps/core/files/exceptions.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.utils.translation import ugettext_lazy as _ + +from apps.core.exceptions import CoreBaseException + + +class FilesBaseException(CoreBaseException): + MODULE_CODE = 3001 + + +class FilesRegisterCredentialError(CoreBaseException): + MESSAGE = _("注册凭证失败") + ERROR_CODE = 1 + + +class FilesStorageTypeError(CoreBaseException): + MESSAGE = _("文件源类型错误") + ERROR_CODE = 2 + + +class FilesRegisterFileSourceError(CoreBaseException): + MESSAGE = _("注册文件失败") + ERROR_CODE = 3 + + +class FilesTransferError(CoreBaseException): + MESSAGE = _("文件传输失败") + ERROR_CODE = 4 diff --git a/apps/core/files/file_source.py b/apps/core/files/file_source.py new file mode 100644 index 000000000..125e00c72 --- /dev/null +++ b/apps/core/files/file_source.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from typing import Any, Dict, Optional + +from django.utils.translation import ugettext_lazy as _ + +from common.api import JobApi + +from . import constants, exceptions, models + + +def get_valid_storage_alias(storage_type: str) -> str: + storage_type_alias = constants.StorageType.get_member_value__alias_map().get(storage_type) + if storage_type_alias is None: + raise exceptions.FilesStorageTypeError( + _("storage_type must be one of {choices}").format(choices={constants.StorageType.list_choices()}) + ) + return storage_type_alias + + +class BkJobFileCredentialManager: + # 文件凭证名称 + CREDENTIAL_NAME_TMPL: str = "NODEMAN_{storage_type}" + CREDENTIAL_DESCRIPTION_TMPL = _("节点管理[biz:{bk_biz_id}]{storage_type_alias}文件凭证") + + @classmethod + def gen_credential_name(cls, storage_type: str) -> str: + """ + 生成凭证名称 + :param storage_type: 文件源类型 + :return: 凭证名称 + """ + return cls.CREDENTIAL_NAME_TMPL.format(storage_type=storage_type) + + @classmethod + def gen_credential_description(cls, bk_biz_id: int, storage_type: str) -> str: + """ + 生成凭证描述 + :param bk_biz_id: 业务ID + :param storage_type: 文件源类型 + :return: 凭证描述 + """ + return cls.CREDENTIAL_DESCRIPTION_TMPL.format( + bk_biz_id=bk_biz_id, storage_type_alias=get_valid_storage_alias(storage_type=storage_type) + ) + + @classmethod + def register_credential( + cls, + bk_biz_id: int, + storage_type: str, + credential_type: str, + credential_auth_info: Dict[str, str], + ) -> models.BKJobFileCredential: + """ + 注册文件凭证 + :param bk_biz_id: 业务ID + :param storage_type: 文件源类型,参考 constants.StorageType + :param credential_type: 凭证认证类型 + :param credential_auth_info: 凭证认证信息 + credential_access_key 凭证类型为ACCESS_KEY_SECRET_KEY时填写 + credential_secret_key 凭证类型为 ACCESS_KEY_SECRET_KEY / SECRET_KEY 时填写 + credential_username 凭证类型为 USERNAME_PASSWORD 时填写 + credential_password 凭证类型为 USERNAME_PASSWORD / PASSWORD 时填写 + :return: models.BKJobFileCredential + """ + + name = cls.gen_credential_name(storage_type=storage_type) + description = cls.gen_credential_description(bk_biz_id=bk_biz_id, storage_type=storage_type) + + # 交给作业平台执行参数校验,无需冗余校验 + create_credential_query_params = { + "bk_biz_id": bk_biz_id, + "type": credential_type, + "name": name, + "description": description, + **credential_auth_info, + } + try: + credential_id = JobApi.create_credential(create_credential_query_params)["id"] + except Exception as err: + raise exceptions.FilesRegisterCredentialError( + _("注册凭证失败:bk_biz_id -> {bk_biz_id}, err_msg -> {err_msg}").format(bk_biz_id=bk_biz_id, err_msg=err) + ) + + # 查询 bk_biz_id & storage_type 是否已存在注册凭证,有则更新,无则创建 + credential_obj, __ = models.BKJobFileCredential.objects.update_or_create( + bk_biz_id=bk_biz_id, + storage_type=storage_type, + defaults={ + "name": name, + "type": credential_type, + "credential_id": credential_id, + "description": description, + }, + ) + + return credential_obj + + @classmethod + def get_or_create_credential( + cls, + bk_biz_id: int, + storage_type: str, + credential_type: str, + credential_auth_info: Dict[str, str], + ) -> models.BKJobFileCredential: + """ + 获取业务文件凭证,不存在会先行创建 + :param bk_biz_id: 业务ID + :param storage_type: 文件源类型,参考 constants.StorageType + :param credential_type: 凭证认证类型 + :param credential_auth_info: 凭证认证信息 + credential_access_key 凭证类型为ACCESS_KEY_SECRET_KEY时填写 + credential_secret_key 凭证类型为 ACCESS_KEY_SECRET_KEY / SECRET_KEY 时填写 + credential_username 凭证类型为 USERNAME_PASSWORD 时填写 + credential_password 凭证类型为 USERNAME_PASSWORD / PASSWORD 时填写 + :return: models.BKJobFileCredential + """ + try: + credential = models.BKJobFileCredential.objects.get(bk_biz_id=bk_biz_id, storage_type=storage_type) + return credential + except models.BKJobFileCredential.DoesNotExist: + return cls.register_credential( + bk_biz_id=bk_biz_id, + storage_type=storage_type, + credential_type=credential_type, + credential_auth_info=credential_auth_info, + ) + + +class BkJobFileSourceManager: + + FILE_SOURCE_CODE_TMPL: str = "NODEMAN_{storage_type}" + # 文件源别名 + FILE_SOURCE_ALIAS_TMPL = _("节点管理[biz:{bk_biz_id}]{storage_type_alias}文件源") + + @classmethod + def gen_file_source_code(cls, storage_type: str) -> str: + """ + 生成文件源编码 + :param storage_type: 文件源类型 + :return: 文件源编码 + """ + return cls.FILE_SOURCE_CODE_TMPL.format(storage_type=storage_type) + + @classmethod + def gen_file_source_alias(cls, bk_biz_id: int, storage_type: str) -> str: + """ + 生成文件源别名 + :param bk_biz_id: 业务ID + :param storage_type: 文件源类型 + :return: 文件源名称 + """ + return cls.FILE_SOURCE_ALIAS_TMPL.format( + bk_biz_id=bk_biz_id, storage_type_alias=get_valid_storage_alias(storage_type=storage_type) + ) + + @classmethod + def register_file_source( + cls, + credential: models.BKJobFileCredential, + access_params: Optional[Dict[str, Any]] = None, + ) -> models.BKJobFileSource: + """ + 注册文件源 + :param credential: 文件凭证model对象 + :param access_params: 文件源接入参数 + :return: models.BKJobFileSource + """ + access_params = access_params or {} + + code = cls.gen_file_source_code(storage_type=credential.storage_type) + alias = cls.gen_file_source_alias(bk_biz_id=credential.bk_biz_id, storage_type=credential.storage_type) + + create_file_source_query_params = { + "type": credential.storage_type, + "bk_biz_id": credential.bk_biz_id, + "credential_id": credential.credential_id, + "code": code, + "alias": alias, + "access_params": access_params, + } + + try: + file_source_id = JobApi.create_file_source(create_file_source_query_params)["id"] + except Exception as err: + raise exceptions.FilesRegisterCredentialError( + _("注册文件源失败:bk_biz_id -> {bk_biz_id}, err_msg -> {err_msg}").format( + bk_biz_id=credential.bk_biz_id, err_msg=err + ) + ) + + file_source_obj, __ = models.BKJobFileSource.objects.update_or_create( + credential_id=credential.credential_id, + defaults={"file_source_id": file_source_id, "code": code, "alias": alias, "access_params": access_params}, + ) + return file_source_obj + + @classmethod + def get_or_create_file_source( + cls, + bk_biz_id: int, + storage_type: str, + credential_type: str, + credential_auth_info: Dict[str, str], + access_params: Optional[Dict[str, Any]] = None, + ) -> models.BKJobFileSource: + """ + 获取文件源,不存在会先行创建 + :param bk_biz_id: 业务ID + :param storage_type: 文件源类型,参考 constants.StorageType + :param credential_type: 凭证认证类型 + :param credential_auth_info: 凭证认证信息 + credential_access_key 凭证类型为ACCESS_KEY_SECRET_KEY时填写 + credential_secret_key 凭证类型为 ACCESS_KEY_SECRET_KEY / SECRET_KEY 时填写 + credential_username 凭证类型为 USERNAME_PASSWORD 时填写 + credential_password 凭证类型为 USERNAME_PASSWORD / PASSWORD 时填写 + :param access_params: 文件源接入参数 + :return: models.BKJobFileCredential + """ + + credential = BkJobFileCredentialManager.get_or_create_credential( + bk_biz_id=bk_biz_id, + storage_type=storage_type, + credential_type=credential_type, + credential_auth_info=credential_auth_info, + ) + + try: + file_source = models.BKJobFileSource.objects.get(credential_id=credential.credential_id) + except models.BKJobFileSource.DoesNotExist: + file_source = cls.register_file_source(credential=credential, access_params=access_params) + + file_source.credential = credential + + return file_source diff --git a/apps/core/files/migrations/0001_initial.py b/apps/core/files/migrations/0001_initial.py new file mode 100644 index 000000000..b5afef44d --- /dev/null +++ b/apps/core/files/migrations/0001_initial.py @@ -0,0 +1,53 @@ +# Generated by Django 2.2.6 on 2021-08-27 11:07 + +import django_mysql.models +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="BKJobFileSource", + fields=[ + ( + "credential_id", + models.CharField(max_length=128, primary_key=True, serialize=False, verbose_name="凭证ID"), + ), + ("file_source_id", models.IntegerField(db_index=True, verbose_name="文件源ID")), + ("code", models.CharField(max_length=128, verbose_name="文件源标识")), + ("alias", models.CharField(max_length=128, verbose_name="文件源别名")), + ("access_params", django_mysql.models.JSONField(default=dict, verbose_name="文件源接入参数")), + ("file_prefix", models.CharField(default="", max_length=128, verbose_name="文件源标识")), + ("update_time", models.DateTimeField(auto_now=True, db_index=True, verbose_name="更新时间")), + ("create_time", models.DateTimeField(auto_now_add=True, db_index=True, verbose_name="创建时间")), + ], + options={ + "verbose_name": "作业平台文件源", + "verbose_name_plural": "作业平台文件源", + }, + ), + migrations.CreateModel( + name="BKJobFileCredential", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("bk_biz_id", models.IntegerField(db_index=True, verbose_name="业务ID")), + ("storage_type", models.CharField(db_index=True, max_length=128, verbose_name="文件源存储类型")), + ("name", models.CharField(max_length=128, verbose_name="凭证名称")), + ("type", models.CharField(max_length=64, verbose_name="凭证类型")), + ("description", models.TextField(default="", verbose_name="凭证描述")), + ("credential_id", models.CharField(max_length=128, verbose_name="凭证ID")), + ("update_time", models.DateTimeField(auto_now=True, db_index=True, verbose_name="更新时间")), + ("create_time", models.DateTimeField(auto_now_add=True, db_index=True, verbose_name="创建时间")), + ], + options={ + "verbose_name": "作业平台文件凭证", + "verbose_name_plural": "作业平台文件凭证", + "unique_together": {("bk_biz_id", "storage_type")}, + }, + ), + ] diff --git a/apps/core/files/migrations/__init__.py b/apps/core/files/migrations/__init__.py new file mode 100644 index 000000000..b402ee3b4 --- /dev/null +++ b/apps/core/files/migrations/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/apps/core/files/models.py b/apps/core/files/models.py new file mode 100644 index 000000000..1eafb1172 --- /dev/null +++ b/apps/core/files/models.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.db import models +from django.utils.translation import ugettext_lazy as _ +from django_mysql.models import JSONField + +from apps.utils.cache import class_member_cache + + +class BKJobFileCredential(models.Model): + bk_biz_id = models.IntegerField(_("业务ID"), db_index=True) + storage_type = models.CharField(_("文件源存储类型"), max_length=128, db_index=True) + + name = models.CharField(_("凭证名称"), max_length=128) + type = models.CharField(_("凭证类型"), max_length=64) + description = models.TextField(_("凭证描述"), default="") + credential_id = models.CharField(_("凭证ID"), max_length=128) + + update_time = models.DateTimeField(_("更新时间"), auto_now=True, db_index=True) + create_time = models.DateTimeField(_("创建时间"), auto_now_add=True, db_index=True) + + class Meta: + verbose_name = _("作业平台文件凭证") + verbose_name_plural = _("作业平台文件凭证") + # 唯一性校验,一个业务暂不支持配置同存储类型的多凭证 + unique_together = (("bk_biz_id", "storage_type"),) + + +class BKJobFileSource(models.Model): + credential_id = models.CharField(_("凭证ID"), max_length=128, primary_key=True) + file_source_id = models.IntegerField(_("文件源ID"), db_index=True) + + code = models.CharField(_("文件源标识"), max_length=128) + alias = models.CharField(_("文件源别名"), max_length=128) + access_params = JSONField(_("文件源接入参数"), default=dict) + file_prefix = models.CharField(_("文件源标识"), max_length=128, default="") + + update_time = models.DateTimeField(_("更新时间"), auto_now=True, db_index=True) + create_time = models.DateTimeField(_("创建时间"), auto_now_add=True, db_index=True) + + @property + @class_member_cache() + def credential(self): + return BKJobFileCredential.objects.get(credential_id=self.credential_id) + + @credential.setter + def credential(self, value): + if self.credential_id != value.id: + self.credential_id = value.id + self.save() + self._credential = value + + class Meta: + verbose_name = _("作业平台文件源") + verbose_name_plural = _("作业平台文件源") diff --git a/apps/core/files/storage.py b/apps/core/files/storage.py index 274fa7982..9a0e4a0ca 100644 --- a/apps/core/files/storage.py +++ b/apps/core/files/storage.py @@ -9,7 +9,7 @@ specific language governing permissions and limitations under the License. """ import os -from typing import Callable, Dict +from typing import Any, Callable, Dict, List from bkstorages.backends import bkrepo from django.conf import settings @@ -17,20 +17,25 @@ from django.utils.deconstruct import deconstructible from django.utils.functional import cached_property -from apps.core.files.base import StorageFileOverwriteMixin +from apps.utils.basic import filter_values + +from . import constants +from .base import BaseStorage +from .file_source import BkJobFileSourceManager @deconstructible -class CustomBKRepoStorage(StorageFileOverwriteMixin, bkrepo.BKRepoStorage): +class CustomBKRepoStorage(BaseStorage, bkrepo.BKRepoStorage): - location = getattr(settings, "BKREPO_LOCATION", "") - file_overwrite = getattr(settings, "FILE_OVERWRITE", False) + storage_type: str = constants.StorageType.BLUEKING_ARTIFACTORY.value + location: str = getattr(settings, "BKREPO_LOCATION", "") + file_overwrite: bool = getattr(settings, "FILE_OVERWRITE", False) - endpoint_url = settings.BKREPO_ENDPOINT_URL - username = settings.BKREPO_USERNAME - password = settings.BKREPO_PASSWORD - project_id = settings.BKREPO_PROJECT - bucket = settings.BKREPO_BUCKET + endpoint_url: str = settings.BKREPO_ENDPOINT_URL + username: str = settings.BKREPO_USERNAME + password: str = settings.BKREPO_PASSWORD + project_id: str = settings.BKREPO_PROJECT + bucket: str = settings.BKREPO_BUCKET def __init__( self, @@ -50,7 +55,11 @@ def __init__( bucket = bucket or self.bucket endpoint_url = endpoint_url or self.endpoint_url file_overwrite = file_overwrite or self.file_overwrite - super().__init__( + + # 根据 MRO 顺序,super() 仅调用 BaseStorage.__init__(),通过显式调用 BKRepoStorage 的初始化函数 + # 获得自定义 BaseStorage 类的重写特性,同时向 BKRepoStorage 注入成员变量 + bkrepo.BKRepoStorage.__init__( + self, root_path=root_path, username=username, password=password, @@ -60,10 +69,44 @@ def __init__( file_overwrite=file_overwrite, ) + def path(self, name): + raise NotImplementedError() + + def _handle_file_source_list( + self, file_source_list: List[Dict[str, Any]], extra_transfer_file_params: Dict[str, Any] + ) -> List[Dict[str, Any]]: + + # 获取或创建文件源 + file_source_obj = BkJobFileSourceManager.get_or_create_file_source( + bk_biz_id=extra_transfer_file_params["bk_biz_id"], + storage_type=self.storage_type, + credential_type=constants.FileCredentialType.USERNAME_PASSWORD.value, + credential_auth_info={"credential_username": self.username, "credential_password": self.password}, + access_params={"base_url": self.endpoint_url}, + ) + + file_source_with_source_info_list = [] + for file_source in file_source_list: + # 作业平台要求制品库分发的路径带上 project/bucket 前缀 + file_list = [ + os.path.join(self.project_id, self.bucket) + file_path for file_path in file_source.get("file_list", []) + ] + + file_source_with_source_info_list.append( + { + "file_list": file_list, + "file_source_id": file_source_obj.file_source_id, + "file_type": constants.StorageType.get_member_value__job_file_type_map()[self.storage_type], + } + ) + + return file_source_with_source_info_list + @deconstructible -class AdminFileSystemStorage(StorageFileOverwriteMixin, FileSystemStorage): +class AdminFileSystemStorage(BaseStorage, FileSystemStorage): + storage_type = constants.StorageType.FILE_SYSTEM.value safe_class = FileSystemStorage OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0) @@ -75,7 +118,8 @@ def __init__( directory_permissions_mode=None, file_overwrite=None, ): - super().__init__( + FileSystemStorage.__init__( + self, location=location, base_url=base_url, file_permissions_mode=file_permissions_mode, @@ -100,6 +144,37 @@ def location(self): """路径指向 / ,重写前路径指向「项目根目录」""" return self.base_location + def _handle_file_source_list( + self, file_source_list: List[Dict[str, Any]], extra_transfer_file_params: Dict[str, Any] + ) -> List[Dict[str, Any]]: + """ + 预处理源文件列表,添加文件源等信息 + :param file_source_list: 源文件 + :param extra_transfer_file_params: transfer_files 的其他参数 + :return: 源文件对象列表 + """ + + account = { + "alias": extra_transfer_file_params.get("account_alias"), + "id": extra_transfer_file_params.get("account_id"), + } + + file_source_with_source_info_list = [] + for file_source in file_source_list: + file_source_with_source_info_list.append( + { + "file_list": file_source.get("file_list", []), + "server": { + # 添加 NFS 服务器信息 + "ip_list": [{"bk_cloud_id": settings.DEFAULT_CLOUD_ID, "ip": settings.BKAPP_NFS_IP}] + }, + "account": filter_values(account), + "file_type": constants.StorageType.get_member_value__job_file_type_map()[self.storage_type], + } + ) + + return file_source_with_source_info_list + # 缓存最基础的Storage _STORAGE_OBJ_CACHE: [str, Storage] = {} @@ -125,7 +200,7 @@ def inner(storage_type: str = settings.STORAGE_TYPE, *args, **construct_params) @cache_storage_obj -def get_storage(storage_type: str = settings.STORAGE_TYPE, safe: bool = False, **construct_params) -> Storage: +def get_storage(storage_type: str = settings.STORAGE_TYPE, safe: bool = False, **construct_params) -> BaseStorage: """ 获取 Storage :param storage_type: 文件存储类型,参考 constants.StorageType diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index 67a485ecb..0203cfcf5 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -26,7 +26,6 @@ reverse_dict, tuple_choices, ) -from config.default import StorageType # 此值为历史遗留,后续蓝鲸不使用此字段后可废弃 DEFAULT_SUPPLIER_ID = 0 diff --git a/apps/node_man/handlers/plugin_v2.py b/apps/node_man/handlers/plugin_v2.py index 606fd331d..61c21af78 100644 --- a/apps/node_man/handlers/plugin_v2.py +++ b/apps/node_man/handlers/plugin_v2.py @@ -20,6 +20,7 @@ from django.core.files.uploadedfile import InMemoryUploadedFile from django.utils.translation import ugettext_lazy as _ +from apps.core.files import core_files_constants from apps.core.files.storage import get_storage from apps.node_man import constants, exceptions, models, tools from apps.node_man.constants import DEFAULT_CLOUD_NAME, IamActionType @@ -70,7 +71,7 @@ def upload(package_file: InMemoryUploadedFile, module: str) -> Dict[str, Any]: # 如果采用对象存储,文件直接上传至仓库,并将返回的目标路径传到后台,由后台进行校验并创建上传记录 # TODO 后续应该由前端上传文件并提供md5 - if settings.STORAGE_TYPE in constants.COS_TYPES: + if settings.STORAGE_TYPE in core_files_constants.StorageType.list_cos_member_values(): storage = get_storage() try: diff --git a/apps/node_man/management/commands/sync_plugin_status.py b/apps/node_man/management/commands/sync_plugin_status.py index 33114af82..62c19b6f2 100644 --- a/apps/node_man/management/commands/sync_plugin_status.py +++ b/apps/node_man/management/commands/sync_plugin_status.py @@ -12,9 +12,9 @@ from django.core.management.base import BaseCommand -from apps.node_man.periodic_tasks import sync_plugin_status_task +from apps.node_man.periodic_tasks import sync_proc_status_task class Command(BaseCommand): def handle(self, **kwargs): - sync_plugin_status_task() + sync_proc_status_task() diff --git a/apps/utils/basic.py b/apps/utils/basic.py index 89a2da70f..d127b6e62 100644 --- a/apps/utils/basic.py +++ b/apps/utils/basic.py @@ -44,10 +44,10 @@ def tuple_to_namedtuple(tupl): return dict_to_namedtuple(dict(tuple_choices(tupl))) -def filter_values(data: dict, filter_empty=False) -> dict: +def filter_values(data: Dict, filter_empty=False) -> Dict: """ 用于过滤空值 - :param filter_empty: 是否同时过滤空值 + :param filter_empty: 是否同时过滤布尔值为False的值 :param data: 存放各个映射关系的字典 :return: 去掉None值的字典 """ diff --git a/apps/utils/cache.py b/apps/utils/cache.py index 258936c83..e346058a2 100644 --- a/apps/utils/cache.py +++ b/apps/utils/cache.py @@ -8,7 +8,31 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from dogpile.cache import make_region -cache_dict = {} -region = make_region().configure("dogpile.cache.memory", arguments={"cache_dict": cache_dict}) +from functools import wraps +from typing import Callable, Optional + + +def class_member_cache(name: Optional[str] = None): + """ + 类成员缓存 + :param name: 缓存名称,为空则使用 class_func.__name__ + :return: + """ + + def class_member_cache_inner(class_func: Callable) -> Callable: + @wraps(class_func) + def wrapper(self, *args, **kwargs): + + cache_field = f"_{name or class_func.__name__}" + + cache_member = getattr(self, cache_field, None) + if cache_member: + return cache_member + cache_member = class_func(self, *args, **kwargs) + setattr(self, cache_field, cache_member) + return cache_member + + return wrapper + + return class_member_cache_inner diff --git a/apps/utils/enum.py b/apps/utils/enum.py new file mode 100644 index 000000000..e57a6beb2 --- /dev/null +++ b/apps/utils/enum.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import abc +from enum import Enum +from typing import Any, Dict, List, Tuple + +from ..utils.cache import class_member_cache + + +class EnhanceEnum(Enum): + """增强枚举类,提供常用的枚举值列举方法""" + + @classmethod + @abc.abstractmethod + def _get_member__alias_map(cls) -> Dict[Enum, str]: + """ + 获取枚举成员与释义的映射关系 + :return: + """ + raise NotImplementedError + + @classmethod + @class_member_cache() + def list_member_values(cls) -> List[Any]: + """ + 获取所有的枚举成员值 + :return: + """ + member_values = [] + for member in cls._member_names_: + member_values.append(cls._value2member_map_[member].value) + return member_values + + @classmethod + @class_member_cache() + def get_member_value__alias_map(cls) -> Dict[Any, str]: + """ + 获取枚举成员值与释义的映射关系,缓存计算结果 + :return: + """ + member_value__alias_map = {} + member__alias_map = cls._get_member__alias_map() + + for member, alias in member__alias_map.items(): + if type(member) is not cls: + raise ValueError(f"except member type -> {cls}, but got -> {type(member)}") + member_value__alias_map[member.value] = alias + + return member_value__alias_map + + @classmethod + @class_member_cache() + def list_choices(cls) -> List[Tuple[Any, Any]]: + """ + 获取可选项列表,一般用于序列化器、model的choices选项 + :return: + """ + return list(cls.get_member_value__alias_map().items()) diff --git a/apps/utils/files.py b/apps/utils/files.py index 8939bf8e1..df4505e68 100644 --- a/apps/utils/files.py +++ b/apps/utils/files.py @@ -10,9 +10,11 @@ """ import hashlib +import ntpath import os +import posixpath import uuid -from typing import IO, Any, Optional +from typing import IO, Any, Callable, Optional import requests @@ -124,3 +126,29 @@ def mk_and_return_tmpdir() -> str: tmp_dir = os.path.join(constants.TMP_DIR, constants.TMP_FILE_NAME_FORMAT.format(name=uuid.uuid4().hex)) os.makedirs(tmp_dir, exist_ok=True) return tmp_dir + + +class PathHandler: + """ + 根据传入的操作系统,返回文件路径处理模块 + 背景:需要根据「目标机器」的操作系统生成命令,涉及路径处理 + 如果是本地的路径处理,os.path 会根据本地的操作系统,返回正确的文件处理模块,无需使用该类 + """ + + # 文件处理模块 + path_handler: [ntpath, posixpath] + + def __init__(self, os_type: str): + self.os_type = os_type.lower() + if os_type == constants.OsType.WINDOWS: + self.path_handler = ntpath + else: + self.path_handler = posixpath + + def __getattr__(self, item: str) -> Callable: + """ + 重写__getattr__方法,使得该类的 .item 直接获取到 self.path_handle 的成员 + :param item: [ntpath, posixpath] method name + :return: [ntpath, posixpath] method + """ + return getattr(self.path_handler, item) diff --git a/common/api/modules/job.py b/common/api/modules/job.py index 4a77811d5..481f0bd91 100644 --- a/common/api/modules/job.py +++ b/common/api/modules/job.py @@ -49,3 +49,15 @@ def __init__(self): module=self.MODULE, description="根据作业实例ID查询作业执行日志", ) + self.create_credential = DataAPI( + method="POST", + url=JOB_APIGATEWAY_ROOT_V3 + "create_credential/", + module=self.MODULE, + description="新建凭证", + ) + self.create_file_source = DataAPI( + method="POST", + url=JOB_APIGATEWAY_ROOT_V3 + "create_file_source/", + module=self.MODULE, + description="新建文件源", + ) diff --git a/config/default.py b/config/default.py index a8bff1c69..cba2786ba 100644 --- a/config/default.py +++ b/config/default.py @@ -32,6 +32,7 @@ "version_log", "apps.node_man", "apps.backend", + "apps.core.files", "requests_tracker", # pipeline "pipeline", @@ -299,7 +300,7 @@ class StorageType(Enum): FILE_SYSTEM = "FILE_SYSTEM" # 制品库 - BKREPO = "BKREPO" + BLUEKING_ARTIFACTORY = "BLUEKING_ARTIFACTORY" # 用于控制默认的文件存储类型 @@ -323,7 +324,7 @@ class StorageType(Enum): # 存储类型 - storage class 映射关系 STORAGE_TYPE_IMPORT_PATH_MAP = { StorageType.FILE_SYSTEM.value: "apps.core.files.storage.AdminFileSystemStorage", - StorageType.BKREPO.value: "apps.core.files.storage.CustomBKRepoStorage", + StorageType.BLUEKING_ARTIFACTORY.value: "apps.core.files.storage.CustomBKRepoStorage", } # 默认的file storage @@ -338,7 +339,7 @@ class StorageType(Enum): # 暂时存在多个上传API的原因:原有文件上传接口被Nginx转发 STORAGE_TYPE_UPLOAD_API_MAP = { StorageType.FILE_SYSTEM.value: FILE_SYSTEM_UPLOAD_API, - StorageType.BKREPO.value: COS_UPLOAD_API, + StorageType.BLUEKING_ARTIFACTORY.value: COS_UPLOAD_API, } DEFAULT_FILE_UPLOAD_API = STORAGE_TYPE_UPLOAD_API_MAP[STORAGE_TYPE] @@ -522,7 +523,9 @@ class StorageType(Enum): } LOGGING["loggers"]["iam"] = {"handlers": ["iam"], "level": LOGGING["loggers"]["root"]["level"], "propagate": True} -# 节点管理后台 LAN_IP + +# TODO 目前后台使用 +# 节点管理后台 BKAPP_LAN_IP 或 BKAPP_NFS_IP 进行文件分发,是否能统一变量 BKAPP_LAN_IP = os.getenv("LAN_IP") # 节点管理后台 NFS_IP