From a4f9ea78d4bc26c302b675a0f7239afb39df9f3d Mon Sep 17 00:00:00 2001 From: crayon <873217631@qq.com> Date: Sat, 14 Aug 2021 11:10:46 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E6=8F=92=E4=BB=B6=E5=8C=85=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E6=94=AF=E6=8C=81=E5=AF=B9=E8=B1=A1=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F=EF=BC=8C=E5=85=AC=E5=85=B1=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=95=B4=E5=90=88=20(#2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/agent/manager.py | 4 +- apps/backend/agent/tools.py | 2 +- apps/backend/components/collections/agent.py | 2 +- .../components/collections/bulk_job.py | 2 +- .../components/collections/bulk_job_redis.py | 2 +- apps/backend/components/collections/job.py | 2 +- apps/backend/components/collections/plugin.py | 2 +- apps/backend/exceptions.py | 5 + .../management/commands/copy_file_to_nginx.py | 2 +- .../commands/init_official_plugins.py | 9 +- apps/backend/plugin/serializers.py | 38 +- apps/backend/plugin/tasks.py | 11 +- apps/backend/plugin/tools.py | 250 ++++++- apps/backend/plugin/views.py | 128 +++- apps/backend/tests/plugin/test_plugin.py | 21 +- apps/backend/urls.py | 8 +- apps/core/__init__.py | 15 + apps/core/files/__init__.py | 10 + apps/core/files/base.py | 52 ++ apps/core/files/storage.py | 145 ++++ apps/generic.py | 12 +- apps/node_man/constants.py | 59 +- apps/node_man/exceptions.py | 6 + apps/node_man/handlers/plugin_v2.py | 77 ++- apps/node_man/models.py | 633 +++++++----------- apps/node_man/tests/test_pluginv2.py | 24 - apps/node_man/tools/plugin_v2.py | 13 - apps/node_man/views/plugin_v2.py | 6 +- apps/utils/basic.py | 16 - apps/utils/env.py | 31 + apps/utils/files.py | 118 ++++ apps/utils/string.py | 31 + apps/utils/tests/test_env.py | 62 ++ config/default.py | 93 ++- requirements.txt | 2 + 35 files changed, 1320 insertions(+), 573 deletions(-) create mode 100644 apps/core/__init__.py create mode 100644 apps/core/files/__init__.py create mode 100644 apps/core/files/base.py create mode 100644 apps/core/files/storage.py create mode 100644 apps/utils/files.py create mode 100644 apps/utils/string.py create mode 100644 apps/utils/tests/test_env.py diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py index b52d1a082..a193cc849 100644 --- a/apps/backend/agent/manager.py +++ b/apps/backend/agent/manager.py @@ -358,7 +358,7 @@ def push_files_to_proxy(self, file): type=Var.PLAIN, value=[{"ip": self.host_info["bk_host_innerip"], "bk_cloud_id": self.host_info["bk_cloud_id"]}], ) - act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.NGINX_DOWNLOAD_PATH) + act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.DOWNLOAD_PATH) act.component.inputs.files = Var(type=Var.PLAIN, value=file["files"]) act.component.inputs.from_type = Var(type=Var.PLAIN, value=file.get("from_type", "")) act.component.inputs.context = Var(type=Var.PLAIN, value="") @@ -369,7 +369,7 @@ def start_nginx(self): path = os.path.join(settings.PROJECT_ROOT, "script_tools", "start_nginx.sh.tpl") with open(path, encoding="utf-8") as fh: script = fh.read() - script_content = script % {"nginx_path": settings.NGINX_DOWNLOAD_PATH} + script_content = script % {"nginx_path": settings.DOWNLOAD_PATH} act = AgentServiceActivity(component_code=JobFastExecuteScriptComponent.code, name=_("启动 NGINX 服务")) act.component.inputs.job_client = Var( type=Var.PLAIN, diff --git a/apps/backend/agent/tools.py b/apps/backend/agent/tools.py index 7ecc3f857..5b24701df 100644 --- a/apps/backend/agent/tools.py +++ b/apps/backend/agent/tools.py @@ -130,7 +130,7 @@ def gen_commands(host, pipeline_id, is_uninstall, batch_install=False): download_cmd = f"curl {package_url}/{shell_file_name} -o {dest_dir}{shell_file_name} --connect-timeout 5 -sSf" else: - run_cmd_params.append(f"-L {settings.NGINX_DOWNLOAD_PATH}") + run_cmd_params.append(f"-L {settings.DOWNLOAD_PATH}") # 云区域自动安装 dest_host = host # 手动批量安装均走该分支 diff --git a/apps/backend/components/collections/agent.py b/apps/backend/components/collections/agent.py index 9e9afc275..5ec066b31 100644 --- a/apps/backend/components/collections/agent.py +++ b/apps/backend/components/collections/agent.py @@ -1005,7 +1005,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() diff --git a/apps/backend/components/collections/bulk_job.py b/apps/backend/components/collections/bulk_job.py index cbf81c1ef..02915ad9f 100644 --- a/apps/backend/components/collections/bulk_job.py +++ b/apps/backend/components/collections/bulk_job.py @@ -423,7 +423,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = models.Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() diff --git a/apps/backend/components/collections/bulk_job_redis.py b/apps/backend/components/collections/bulk_job_redis.py index f48e18029..9013e9550 100644 --- a/apps/backend/components/collections/bulk_job_redis.py +++ b/apps/backend/components/collections/bulk_job_redis.py @@ -412,7 +412,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = models.Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index 07267dab5..6bf3e1df6 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -573,7 +573,7 @@ def execute(self, data, parent_data): data.inputs.file_source = [ { - "files": [f"{settings.NGINX_DOWNLOAD_PATH}/{file}" for file in files], + "files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files], "account": "root", "ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}], } diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index 846c1b5cb..d6e4db644 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -386,7 +386,7 @@ def _execute(self, data, parent_data, common_data: CommonData): # 如 linux-arm、linux-x86、windows-x86 的插件,需分为三组 # 把多个IP合并为一个任务,可以利用GSE文件管道的BT能力,提高传输效率 jobs: Dict[str, Dict[str, Union[list, str]]] = defaultdict(lambda: defaultdict(list)) - nginx_path = settings.NGINX_DOWNLOAD_PATH + nginx_path = settings.DOWNLOAD_PATH for process_status in process_statuses: bk_host_id = process_status.bk_host_id subscription_instance = group_id_instance_map.get(process_status.group_id) diff --git a/apps/backend/exceptions.py b/apps/backend/exceptions.py index 052597a08..154eef5c9 100644 --- a/apps/backend/exceptions.py +++ b/apps/backend/exceptions.py @@ -51,3 +51,8 @@ class PackageVersionValidationError(BackendBaseException): class GenCommandsError(BackendBaseException): MESSAGE = _("安装命令生成失败") ERROR_CODE = 7 + + +class PluginParseError(BackendBaseException): + MESSAGE = _("插件解析错误") + ERROR_CODE = 8 diff --git a/apps/backend/management/commands/copy_file_to_nginx.py b/apps/backend/management/commands/copy_file_to_nginx.py index f65de4db0..c9aff9e6b 100644 --- a/apps/backend/management/commands/copy_file_to_nginx.py +++ b/apps/backend/management/commands/copy_file_to_nginx.py @@ -29,7 +29,7 @@ def handle(self, *args, **options): # 接入点配置的nginx路径 nginx_paths = [ap.nginx_path for ap in AccessPoint.objects.all() if ap.nginx_path] # 默认nginx路径 - nginx_paths.append(settings.NGINX_DOWNLOAD_PATH) + nginx_paths.append(settings.DOWNLOAD_PATH) # 去重 nginx_paths = list(set(nginx_paths)) for _path in os.listdir(settings.BK_SCRIPTS_PATH): diff --git a/apps/backend/management/commands/init_official_plugins.py b/apps/backend/management/commands/init_official_plugins.py index cff5c0df2..0d145edc0 100644 --- a/apps/backend/management/commands/init_official_plugins.py +++ b/apps/backend/management/commands/init_official_plugins.py @@ -19,8 +19,9 @@ from django.core.management.base import BaseCommand from django.db.transaction import atomic +from apps.backend.plugin import tools from apps.node_man import models -from apps.utils.basic import md5 +from apps.utils.files import md5sum from common.log import logger @@ -59,7 +60,7 @@ def handle(self, *args, **options): # 后续可以考虑通过路径来判断 module="gse_plugin", file_path=file_abs_path, - md5=md5(file_abs_path), + md5=md5sum(name=file_abs_path), operator="system", source_app_code="bk_nodeman", file_name=file_name, @@ -68,7 +69,9 @@ def handle(self, *args, **options): try: # 如果是官方内置的插件,那么应该是直接发布的 - package_list = upload_record.create_package_records( + package_list = tools.create_package_records( + file_path=upload_record.file_path, + file_name=upload_record.file_name, is_release=True, is_template_load=True, is_template_overwrite=True, diff --git a/apps/backend/plugin/serializers.py b/apps/backend/plugin/serializers.py index f40993cb8..c4d66b63a 100644 --- a/apps/backend/plugin/serializers.py +++ b/apps/backend/plugin/serializers.py @@ -13,11 +13,12 @@ import base64 import hashlib +from django.utils.translation import ugettext_lazy as _ from rest_framework import serializers from rest_framework.exceptions import ValidationError from apps.node_man import constants -from apps.node_man.models import GsePluginDesc, Packages +from apps.node_man.models import DownloadRecord, GsePluginDesc, Packages class GatewaySerializer(serializers.Serializer): @@ -202,14 +203,28 @@ def validate(self, attrs): return attrs -class UploadInfoSerializer(GatewaySerializer): +class UploadInfoBaseSerializer(GatewaySerializer): + md5 = serializers.CharField(help_text=_("上传端计算的文件md5"), max_length=32) + file_name = serializers.CharField(help_text=_("上传端提供的文件名"), min_length=1) + module = serializers.CharField(max_length=32, required=False, default="gse_plugin") + + +class UploadInfoSerializer(UploadInfoBaseSerializer): """上传插件包接口序列化器""" - module = serializers.CharField(max_length=32, required=False, default="gse_plugin") - md5 = serializers.CharField(max_length=32) - file_name = serializers.CharField() - file_local_path = serializers.CharField(max_length=512) - file_local_md5 = serializers.CharField(max_length=32) + file_local_path = serializers.CharField(help_text=_("本地文件路径"), max_length=512) + file_local_md5 = serializers.CharField(help_text=_("Nginx所计算的文件md5"), max_length=32) + + +class CosUploadInfoSerializer(UploadInfoBaseSerializer): + download_url = serializers.URLField(help_text=_("对象存储文件下载url"), required=False) + file_path = serializers.CharField(help_text=_("文件保存路径"), min_length=1, required=False) + + def validate(self, attrs): + # 两种参数模式少要有一种满足 + if not ("download_url" in attrs or "file_path" in attrs): + raise ValidationError("at least has download_url or file_path") + return attrs class PluginStartDebugSerializer(GatewaySerializer): @@ -287,6 +302,15 @@ def validate(self, data): creator = serializers.CharField() bk_app_code = serializers.CharField() + def validate(self, attrs): + if attrs["category"] not in DownloadRecord.CATEGORY_TASK_DICT: + raise ValidationError( + "请求下载类型 -> {category} 暂不支持,可选项 -> {choices}".format( + category=attrs["category"], choices=DownloadRecord.CATEGORY_CHOICES + ) + ) + return attrs + class DeletePluginSerializer(GatewaySerializer): name = serializers.CharField() diff --git a/apps/backend/plugin/tasks.py b/apps/backend/plugin/tasks.py index 60b6363c5..663e27113 100644 --- a/apps/backend/plugin/tasks.py +++ b/apps/backend/plugin/tasks.py @@ -18,6 +18,7 @@ from django.utils.translation import ugettext as _ from apps.backend.celery import app +from apps.backend.plugin import tools from apps.backend.utils.pipeline_parser import PipelineParser as CustomPipelineParser from apps.node_man import constants as const from apps.node_man import models @@ -41,7 +42,7 @@ def package_task(job_id, task_params): job = models.Job.objects.get(id=job_id, job_type=const.JobType.PACKING_PLUGIN) except models.Job.DoesNotExist: - logger.error("try to execute job->[%s] but is not exists") + logger.error("try to execute job-> {job_id} but is not exists".format(job_id=job_id)) return False try: @@ -52,10 +53,12 @@ def package_task(job_id, task_params): upload_package_object = models.UploadPackage.objects.filter(file_name=file_name).order_by("-upload_time")[0] # 2. 执行任务 - upload_package_object.create_package_records( + tools.create_package_records( + file_path=upload_package_object.file_path, + file_name=upload_package_object.file_name, is_release=is_release, creator=task_params["bk_username"], - select_pkg_abs_paths=select_pkg_abs_paths, + select_pkg_relative_paths=select_pkg_abs_paths, is_template_load=task_params.get("is_template_load", False), is_template_overwrite=task_params.get("is_template_overwrite", False), ) @@ -85,7 +88,7 @@ def package_task(job_id, task_params): job.save() if job.status == const.JobStatusType.SUCCESS: - logger.info("task->[%s] has finish all job." % job.id) + logger.info("task -> {job_id} has finish all job.".format(job_id=job.id)) @app.task(queue="backend") diff --git a/apps/backend/plugin/tools.py b/apps/backend/plugin/tools.py index c33ab9615..458178fbd 100644 --- a/apps/backend/plugin/tools.py +++ b/apps/backend/plugin/tools.py @@ -10,26 +10,147 @@ """ import logging import os +import shutil +import tarfile import traceback from collections import defaultdict -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import yaml +from django.db import transaction from django.utils.translation import ugettext_lazy as _ from packaging import version -from apps.backend.exceptions import PackageVersionValidationError +from apps.backend import exceptions +from apps.core.files.storage import get_storage from apps.node_man import constants, models +from apps.utils import files logger = logging.getLogger("app") -def parse_package(package_info: dict, is_update: bool, project: str) -> dict: +def list_package_infos(file_path: str) -> List[Dict[str, Any]]: + """ + :param file_path: 插件包所在路径 + 解析`self.file_path`下插件,获取包信息字典列表 + :return: [ + { + # 存放插件的临时目录 + "plugin_tmp_dir": "/tmp/12134/", + # 插件包的相对路径 + "pkg_relative_path": "plugins_linux_x86_64/package_name" + # 插件包的绝对路径 + "pkg_absolute_path": "/tmp/12134/plugins_linux_x86_64/package_name", + # 插件所需操作系统 + "package_os": "linux", + # 支持cpu位数 + "cpu_arch": "x86_64", + # 是否为自定义插件 + "is_external": "False" + }, + ... + ] + """ + storage = get_storage() + if not storage.exists(name=file_path): + raise exceptions.PluginParseError(_("插件不存在: file_path -> {file_path}").format(file_path=file_path)) + + # 解压压缩文件 + tmp_dir = files.mk_and_return_tmpdir() + with storage.open(name=file_path, mode="rb") as tf_from_storage: + with tarfile.open(fileobj=tf_from_storage) as tf: + # 检查是否存在可疑内容 + for file_info in tf.getmembers(): + if file_info.name.startswith("/") or "../" in file_info.name: + logger.error( + "file-> {file_path} contains member-> {name} try to escape!".format( + file_path=file_path, name=file_info.name + ) + ) + raise exceptions.PluginParseError(_("文件包含非法路径成员 -> {name},请检查").format(name=file_info.name)) + logger.info( + "file-> {file_path} extract to path -> {tmp_dir} success.".format(file_path=file_path, tmp_dir=tmp_dir) + ) + tf.extractall(path=tmp_dir) + + package_infos = [] + + # 遍历第一层的内容,获取操作系统和cpu架构信息,eg:external(可无,有表示自定义插件)_plugins_linux_x86_64 + for first_plugin_dir_name in os.listdir(tmp_dir): + # 通过正则提取出插件(plugin)目录名中的插件信息 + re_match = constants.PACKAGE_PATH_RE.match(first_plugin_dir_name) + if re_match is None: + logger.info( + "pkg_dir_name -> {pkg_dir_name} is not match re, jump it.".format(pkg_dir_name=first_plugin_dir_name) + ) + continue + + # 将文件名解析为插件信息字典 + plugin_info_dict = re_match.groupdict() + current_os = plugin_info_dict["os"] + cpu_arch = plugin_info_dict["cpu_arch"] + logger.info( + "pkg_dir_name -> {pkg_dir_name} is match for os -> {os}, cpu -> {cpu_arch}".format( + pkg_dir_name=first_plugin_dir_name, os=current_os, cpu_arch=cpu_arch + ) + ) + + first_level_plugin_path = os.path.join(tmp_dir, first_plugin_dir_name) + # 遍历第二层的内容,获取包名, eg:plugins_linux_x86_64/package_name + for second_package_dir_name in os.listdir(first_level_plugin_path): + # 拼接获取包路径 + second_level_package_dir_path = os.path.join(first_level_plugin_path, second_package_dir_name) + if not os.path.isdir(second_level_package_dir_path): + logger.info("found file path -> {path} jump it".format(path=second_level_package_dir_path)) + continue + + package_infos.append( + { + "plugin_tmp_dir": tmp_dir, + "pkg_relative_path": os.path.join(first_plugin_dir_name, second_package_dir_name), + "pkg_absolute_path": second_level_package_dir_path, + "package_os": current_os, + "cpu_arch": cpu_arch, + "is_external": plugin_info_dict["is_external"] is not None, + } + ) + + return package_infos + + +def parse_package(package_info: Dict[str, Any], is_update: bool, project: str) -> dict: + """ + 解析插件包 + :param package_info: 插件包信息 + :param is_update: 是否校验版本更新 + :param project: 插件名称,在该项目中,plugin_name, project 都表示插件名称 + :return: + { + "result": true, + "message": "更新插件版本", + "pkg_name": "bkmonitorbeat-2.0.0.tgz", + "pkg_abs_path": "plugins_linux_x86/bkmonitorbeat", + "project": "bkmonitorbeat", + "version": "2.0.0", + "category": "官方插件", + "config_templates": [ + { + "name": "bkmonitorbeat.conf", + "version": "2.0.0", + "is_main": 1 + } + ], + "os": "linux", + "cpu_arch": "x86", + "description": "蓝鲸监控指标采集器" + } + """ package_parse_detail = { "result": True, "message": "", "pkg_name": None, - "pkg_abs_path": package_info["pkg_abs_path"], + # TODO 命名错误,该字段表示插件包的相对路径,命名却是绝对路径,需要和前端配合修改,暂时保留 + "pkg_abs_path": package_info["pkg_relative_path"], "project": None, "version": None, "category": None, @@ -41,10 +162,12 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: update_flag = False # 1. 判断是否存在project.yaml文件 - project_file_path = os.path.join(package_info["dir_path"], "project.yaml") - if not os.path.exists(project_file_path): + project_yaml_file_path = os.path.join(package_info["pkg_absolute_path"], "project.yaml") + if not os.path.exists(project_yaml_file_path): logger.warning( - "try to pack path->[%s] but is not [project.yaml] file under file path" % package_info["dir_path"] + "try to pack path-> {pkg_absolute_path} but is not [project.yaml] file under file path".format( + pkg_absolute_path=package_info["pkg_absolute_path"] + ) ) package_parse_detail["result"] = False package_parse_detail["message"] = _("缺少project.yaml文件") @@ -52,13 +175,15 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: # 2. 解析project.yaml文件(版本,插件名等信息) try: - with open(project_file_path, "r", encoding="utf-8") as project_file: + with open(project_yaml_file_path, "r", encoding="utf-8") as project_file: yaml_config = yaml.safe_load(project_file) if not isinstance(yaml_config, dict): raise yaml.YAMLError except (IOError, yaml.YAMLError): logger.warning( - "failed to parse or read project_yaml->[{}] for->[{}]".format(project_file_path, traceback.format_exc()) + "failed to parse or read project_yaml -> {project_yaml_file_path}, for -> {err_msg}".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) ) package_parse_detail["result"] = False package_parse_detail["message"] = _("project.yaml文件解析读取失败") @@ -70,7 +195,7 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: package_parse_detail.update( { - "pkg_name": "{}-{}.tgz".format(yaml_config["name"], yaml_config["version"]), + "pkg_name": "{name}-{version}.tgz".format(name=yaml_config["name"], version=yaml_config["version"]), "project": yaml_config["name"], "version": yaml_config["version"], "category": yaml_config["category"], @@ -79,8 +204,9 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: ) except KeyError: logger.warning( - "failed to get key info from project.yaml->[%s] for->[%s] maybe config file error?" - % (project_file_path, traceback.format_exc()) + "failed to get key info from project.yaml -> {project_yaml_file_path}, for -> {err_msg}".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) ) package_parse_detail["result"] = False package_parse_detail["message"] = _("project.yaml文件信息缺失") @@ -90,7 +216,7 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: # 更新插件名称对应不上 if is_update and yaml_config["name"] != project: - raise PackageVersionValidationError( + raise exceptions.PackageVersionValidationError( _("期望更新的插件为[{project}],实际上传的插件为[{update_plugin_name}]").format( project=project, update_plugin_name=yaml_config["name"] ) @@ -99,8 +225,10 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: # 3. 判断插件类型是否符合预取 if yaml_config["category"] not in constants.CATEGORY_TUPLE: logger.warning( - "project->[%s] version->[%s] update(or create) with category->[%s] which is not acceptable, " - "nothing will do." % (yaml_config["name"], yaml_config["version"], yaml_config["category"]) + "project-> {project}, version-> {version}: update(or create) with category-> {category} " + "which is not acceptable, nothing will do.".format( + project=yaml_config["name"], version=yaml_config["version"], category=yaml_config["category"] + ) ) package_parse_detail["result"] = False package_parse_detail["message"] = _("project.yaml中category配置异常,请确认后重试") @@ -113,17 +241,23 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: # 4. 判断是否为新增插件 if not packages_queryset.exists(): logger.info( - "project->[%s] os->[%s] cpu_arch->[%s] is not exists, this operations will create new package" - % (yaml_config["name"], package_info["package_os"], package_info["cpu_arch"]) + "project-> {project}, os-> {os}, cpu_arch-> {cpu_arch} is not exists, " + "this operations will create new package".format( + project=yaml_config["name"], os=package_info["package_os"], cpu_arch=package_info["cpu_arch"] + ) ) package_parse_detail["message"] = _("新增插件") # 5. 判断以前是否已发布过该插件版本 elif packages_queryset.filter(version=yaml_config["version"], is_release_version=True, is_ready=True).exists(): logger.warning( - "project->[%s] version->[%s] os->[%s] cpu_arch->[%s] is release, no more operations is " - "allowed." - % (yaml_config["name"], yaml_config["version"], package_info["package_os"], package_info["cpu_arch"]) + "project-> {project}, version-> {version}, os-> {os}, cpu_arch -> {cpu_arch} is release, " + "will overwrite it".format( + project=yaml_config["name"], + version=yaml_config["version"], + os=package_info["package_os"], + cpu_arch=package_info["cpu_arch"], + ) ) package_parse_detail["message"] = _("已有版本插件更新") @@ -141,16 +275,16 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: package_parse_detail["message"] = _("更新插件版本") logger.info( - "project->[{project}] validate version: is_update->[{is_update}], update_flag->[{update_flag}]".format( + "project-> {project} validate version: is_update -> {is_update}, update_flag -> {update_flag}".format( project=yaml_config["name"], is_update=is_update, update_flag=update_flag ) ) # 需要校验版本更新,但该插件没有升级 if is_update and not update_flag: - raise PackageVersionValidationError( - _("文件路径[{pkg_abs_path}]所在包解析版本为[{parse_version}], 最新版本为[{release_version}], 更新校验失败").format( - pkg_abs_path=package_parse_detail["pkg_abs_path"], + raise exceptions.PackageVersionValidationError( + _("文件路径[{pkg_relative_path}]所在包解析版本为[{parse_version}], 最新版本为[{release_version}], 更新校验失败").format( + pkg_relative_path=package_parse_detail["pkg_abs_path"], parse_version=package_parse_detail["version"], release_version=package_release_version, ) @@ -160,14 +294,15 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: config_templates = yaml_config.get("config_templates", []) for config_template in config_templates: source_path = config_template["source_path"] - template_file_path = os.path.join(package_info["dir_path"], source_path) + template_file_path = os.path.join(package_info["pkg_absolute_path"], source_path) if not os.path.exists(template_file_path): logger.warning( - "project.yaml need to import file->[%s] but is not exists, nothing will do." - % config_template["source_path"] + "project.yaml need to import file-> {source_path} but is not exists, nothing will do.".format( + source_path=config_template["source_path"] + ) ) package_parse_detail["result"] = False - package_parse_detail["message"] = _("找不到需要导入的配置模板文件[%s]") % source_path + package_parse_detail["message"] = _("找不到需要导入的配置模板文件[{source_path}]").format(source_path=source_path) return package_parse_detail package_parse_detail["config_templates"].append( @@ -194,3 +329,62 @@ def fetch_latest_config_templates(config_templates: List[Dict[str, Any]]) -> Lis latest_config_templates.append(config_tmpls_order_by_version[-1]) return latest_config_templates + + +def create_package_records( + file_path: str, + file_name: str, + is_release: bool, + creator: Optional[str] = None, + select_pkg_relative_paths: Optional[List[str]] = None, + is_template_load: bool = False, + is_template_overwrite: bool = False, +) -> List[models.Packages]: + """ + 解析上传插件,拆分为插件包并保存记录 + :param file_path: 上传插件所在路径 + :param file_name: 上传插件名称 + :param is_release: 是否正式发布 + :param creator: 操作人 + :param select_pkg_relative_paths: 指定注册插件包的相对路径列表 + :param is_template_load: 是否需要读取配置文件 + :param is_template_overwrite: 是否可以覆盖已经存在的配置文件 + :return: [package_object, ...] + :return: + """ + pkg_record_objs = [] + package_infos = list_package_infos(file_path=file_path) + + with transaction.atomic(): + for package_info in package_infos: + if not ( + select_pkg_relative_paths is None or package_info["pkg_relative_path"] in select_pkg_relative_paths + ): + logger.info("path -> {path} not selected, jump it".format(path=package_info["pkg_relative_path"])) + continue + pkg_record_obj = models.Packages.create_record( + dir_path=package_info["pkg_absolute_path"], + package_os=package_info["package_os"], + cpu_arch=package_info["cpu_arch"], + is_release=is_release, + creator=creator, + is_external=package_info["is_external"], + is_template_load=is_template_load, + is_template_overwrite=is_template_overwrite, + ) + + logger.info( + "package path -> {path} add to pkg record-> {record_id} success.".format( + path=package_info["pkg_relative_path"], record_id=pkg_record_obj.id + ) + ) + pkg_record_objs.append(pkg_record_obj) + + logger.info("plugin -> {file_name} create pkg record all done.".format(file_name=file_name)) + + # 清理临时解压目录 + plugin_tmp_dirs = set([package_info["plugin_tmp_dir"] for package_info in package_infos]) + for plugin_tmp_dir in plugin_tmp_dirs: + shutil.rmtree(plugin_tmp_dir) + + return pkg_record_objs diff --git a/apps/backend/plugin/views.py b/apps/backend/plugin/views.py index 9420a9770..548973c82 100644 --- a/apps/backend/plugin/views.py +++ b/apps/backend/plugin/views.py @@ -17,6 +17,7 @@ import logging import os import re +import shutil from collections import defaultdict from copy import deepcopy from itertools import groupby @@ -46,10 +47,12 @@ ) from apps.backend.subscription.handler import SubscriptionHandler from apps.backend.subscription.tasks import run_subscription_task_and_create_instance +from apps.core.files.storage import get_storage from apps.exceptions import AppBaseException, ValidationError from apps.generic import APIViewSet from apps.node_man import constants as const from apps.node_man import models +from apps.utils import files LOG_PREFIX_RE = re.compile(r"(\[\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}.*?\] )") logger = logging.getLogger("app") @@ -159,7 +162,9 @@ def create_plugin_register_task(self, request): ) # 这个新的任务,应该是指派到自己机器上的打包任务 tasks.package_task.delay(job.id, params) - logger.info("create job->[{}] to unpack file->[{}] plugin".format(job.id, file_name)) + logger.info( + "create job-> {job_id} to unpack file-> {file_name} plugin".format(job_id=job.id, file_name=file_name) + ) return Response({"job_id": job.id}) @@ -715,16 +720,18 @@ def create_export_task(self, request): record = models.DownloadRecord.create_record( category=params["category"], query_params=params["query_params"], - creator=params["creator"], + creator=params["bk_username"], source_app_code=params["bk_app_code"], ) logger.info( - "user->[%s] request to export from system->[%s] success created record->[%s]." - % (params["creator"], params["bk_app_code"], record.id) + "user -> {username} request to export from system -> {bk_app_code} success created " + "record -> {record_id}.".format( + username=params["bk_username"], bk_app_code=params["bk_app_code"], record_id=record.id + ) ) tasks.export_plugin.delay(record.id) - logger.info("record->[%s] now is active to celery" % record.id) + logger.info("record-> {record_id} now is active to celery".format(record_id=record.id)) return Response({"job_id": record.id}) @@ -753,19 +760,30 @@ def query_export_task(self, request): try: record = models.DownloadRecord.objects.get(id=job_id) except models.DownloadRecord.DoesNotExist: - logger.error("record->[%s] not exists, something go wrong?" % job_id) + logger.error("record-> {record_id} not exists, something go wrong?".format(record_id=job_id)) raise ValueError(_("请求任务不存在,请确认后重试")) - uri_with_params = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_URL, record.download_params]) + if record.is_failed or not record.file_path: + download_url = "" + else: + # TODO: 此处后续需要提供一个统一的 storage.tmp_url(name) 方法,用于插件包的临时下载 + if settings.STORAGE_TYPE in const.COS_TYPES: + download_url = get_storage().url(record.file_path) + else: + download_url = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_API, record.download_params]) response_data = { "is_finish": record.is_finish, "is_failed": record.is_failed, - "download_url": uri_with_params if not record.is_failed else "", # 下载URL + "download_url": download_url, "error_message": record.error_message, } - logger.info("export record->[{}] response_data->[{}]".format(job_id, response_data)) + logger.info( + "export record -> {record_id} response_data -> {response_data}".format( + record_id=job_id, response_data=response_data + ) + ) return Response(response_data) @action(detail=False, methods=["POST"], url_path="parse") @@ -820,13 +838,20 @@ def parse(self, request): ) if upload_package_obj is None: raise exceptions.UploadPackageNotExistError(_("找不到请求发布的文件,请确认后重试")) - # 获取包信息并解析 - return Response( - [ - tools.parse_package(package_info, params["is_update"], params.get("project")) - for package_info in upload_package_obj.list_package_infos() - ] - ) + + # 获取插件中各个插件包的路径信息 + package_infos = tools.list_package_infos(file_path=upload_package_obj.file_path) + # 解析插件包 + parse_package_results = [] + for package_info in package_infos: + parse_package_result = tools.parse_package(package_info, params["is_update"], params.get("project")) + parse_package_results.append(parse_package_result) + + # 清理临时解压目录 + plugin_tmp_dirs = set([package_info["plugin_tmp_dir"] for package_info in package_infos]) + for plugin_tmp_dir in plugin_tmp_dirs: + shutil.rmtree(plugin_tmp_dir) + return Response(parse_package_results) def list(self, request, *args, **kwargs): """ @@ -1245,6 +1270,77 @@ def upload_package(request): ) +@csrf_exempt +@login_exempt +def upload_package_by_cos(request): + ser = serializers.CosUploadInfoSerializer(data=request.POST) + if not ser.is_valid(): + logger.error("failed to valid request data for->[%s] maybe something go wrong?" % ser.errors) + raise ValidationError(_("请求参数异常 [{err}],请确认后重试").format(err=ser.errors)) + + md5 = ser.data["md5"] + origin_file_name = ser.data["file_name"] + file_path = ser.data.get("file_path") + download_url: str = ser.data.get("download_url") + + storage = get_storage() + + # TODO 此处的md5校验放到文件实际读取使用的地方更合理? + # file_path 不为空表示文件已在项目管理的对象存储上,此时仅需校验md5,减少文件IO + if file_path: + try: + if files.md5sum(file_obj=storage.open(name=file_path)) != md5: + raise ValidationError(_("上传文件MD5校验失败,请确认重试")) + except Exception as e: + raise ValidationError(_("文件不存在:file_path -> {file_path},error -> {err}").format(file_path=file_path, err=e)) + else: + # 创建临时存放下载插件的目录 + tmp_dir = files.mk_and_return_tmpdir() + with open(file=os.path.join(tmp_dir, origin_file_name), mode="wb+") as fs: + # 下载文件并写入fs + files.download_file(url=download_url, file_obj=fs, closed=False) + # 计算下载文件的md5 + local_md5 = files.md5sum(file_obj=fs, closed=False) + if local_md5 != md5: + logger.error( + "failed to valid file md5 local->[{}] user->[{}] maybe network error".format(local_md5, md5) + ) + raise ValidationError(_("上传文件MD5校验失败,请确认重试")) + + # 使用上传端提供的期望保存文件名,保存文件到项目所管控的存储 + file_path = storage.save(name=os.path.join(settings.UPLOAD_PATH, origin_file_name), content=fs) + + # 移除临时目录 + shutil.rmtree(tmp_dir) + + record = models.UploadPackage.create_record( + module=ser.data["module"], + file_path=file_path, + md5=md5, + operator=ser.data["bk_username"], + source_app_code=ser.data["bk_app_code"], + # 此处使用落地到文件系统的文件名,对象存储情况下文件已经写到仓库,使用接口传入的file_name会在后续判断中再转移一次文件 + file_name=os.path.basename(file_path), + ) + logger.info( + "user->[%s] from app->[%s] upload file->[%s] success." + % (record.creator, record.source_app_code, record.file_path) + ) + + return JsonResponse( + { + "result": True, + "message": "", + "code": "00", + "data": { + "id": record.id, # 包文件的ID + "name": record.file_name, # 包名 + "pkg_size": record.file_size, # 单位byte + }, + } + ) + + @csrf_exempt @login_exempt def export_download(request): diff --git a/apps/backend/tests/plugin/test_plugin.py b/apps/backend/tests/plugin/test_plugin.py index e7fa4ae16..1b2d7d3de 100644 --- a/apps/backend/tests/plugin/test_plugin.py +++ b/apps/backend/tests/plugin/test_plugin.py @@ -21,6 +21,7 @@ from django.core.management import call_command from django.test import TestCase +from apps.backend.plugin import tools from apps.backend.plugin.tasks import export_plugin, package_task, run_pipeline from apps.backend.tests.plugin import utils from apps.backend.tests.plugin.test_plugin_status_change import TestApiBase @@ -184,7 +185,7 @@ def setUp(self): tfile.add(temp_file_path, arcname=".", recursive=True) # nginx的模拟路径 - settings.NGINX_DOWNLOAD_PATH = self.temp_path + settings.DOWNLOAD_PATH = self.temp_path settings.UPLOAD_PATH = self.upload_path settings.EXPORT_PATH = self.export_path @@ -204,8 +205,8 @@ def test_create_upload_record_and_register(self): """测试创建上传包记录功能""" # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -232,7 +233,9 @@ def test_create_upload_record_and_register(self): # 测试单独注册插件包功能 upload_object = UploadPackage.objects.get(file_name=self.tarfile_name) - package_object_list = upload_object.create_package_records(is_release=True) + package_object_list = tools.create_package_records( + file_path=upload_object.file_path, file_name=upload_object.file_name, is_release=True + ) self.assertEqual( GsePluginDesc.objects.get(name="test_plugin").node_manage_control, @@ -316,8 +319,8 @@ def _test_upload_api_success(self): def _test_register_api_success(self): # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -371,8 +374,8 @@ def test_create_task_register_optional_api(self): self._test_upload_api_success() # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -773,7 +776,7 @@ def setUp(self): tfile.add(temp_file_path, arcname=".", recursive=True) # nginx的模拟路径 - settings.NGINX_DOWNLOAD_PATH = settings.UPLOAD_PATH = self.target_path + settings.DOWNLOAD_PATH = settings.UPLOAD_PATH = self.target_path settings.BK_OFFICIAL_PLUGINS_INIT_PATH = self.temp_path diff --git a/apps/backend/urls.py b/apps/backend/urls.py index 8bc179e4c..1a6f1d20f 100644 --- a/apps/backend/urls.py +++ b/apps/backend/urls.py @@ -13,7 +13,12 @@ from apps.backend import views from apps.backend.healthz.views import HealthzViewSet -from apps.backend.plugin.views import PluginViewSet, export_download, upload_package +from apps.backend.plugin.views import ( + PluginViewSet, + export_download, + upload_package, + upload_package_by_cos, +) from apps.backend.subscription.views import SubscriptionViewSet routers = drf_routers.DefaultRouter(trailing_slash=True) @@ -25,6 +30,7 @@ urlpatterns = [ url(r"api/", include(routers.urls)), url(r"^package/upload/$", upload_package), + url(r"^package/upload_cos/$", upload_package_by_cos), url(r"^export/download/$", export_download, name="export_download"), url(r"^export/", include(export_routers.urls)), url(r"^get_gse_config/", views.get_gse_config), diff --git a/apps/core/__init__.py b/apps/core/__init__.py new file mode 100644 index 000000000..5365eb9da --- /dev/null +++ b/apps/core/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +""" +core 文件夹用于放置 backend / node_man 等模块公用且具有一定代码规模的模块 +区别于utils:放置小型Python函数和类,这类代码用于简化代码逻辑,且与业务无关 +""" diff --git a/apps/core/files/__init__.py b/apps/core/files/__init__.py new file mode 100644 index 000000000..b402ee3b4 --- /dev/null +++ b/apps/core/files/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/apps/core/files/base.py b/apps/core/files/base.py new file mode 100644 index 000000000..60c7f294e --- /dev/null +++ b/apps/core/files/base.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os + +from django.conf import settings +from django.core.exceptions import SuspiciousFileOperation +from django.utils.crypto import get_random_string + + +class StorageFileOverwriteMixin: + + file_overwrite = settings.FILE_OVERWRITE + + def get_available_name(self, name, max_length=None): + """重写获取文件有效名称函数,支持在 file_overwrite=True 时不随机生成文件名""" + + dir_name, file_name = os.path.split(name) + file_root, file_ext = os.path.splitext(file_name) + + def _gen_random_name(_file_root) -> str: + # 在文件名的起始位置添加随机串,源码规则为 "%s_%s%s" % (_file_root, get_random_string(7), file_ext) + # 上述规则对 .tar.gz 不友好,会在类型后缀中间加随机串,所以改为随机串作为前缀 + return os.path.join(dir_name, "%s_%s%s" % (get_random_string(7), _file_root, file_ext)) + + # not self.file_overwrite and self.exists(name) 利用 and 短路特点,如果 file_overwrite=True 就无需校验文件是否存在 + while (not self.file_overwrite and self.exists(name)) or (max_length and len(name) > max_length): + # file_ext includes the dot. + name = name if self.file_overwrite else _gen_random_name(file_root) + + if max_length is None: + continue + # Truncate file_root if max_length exceeded. + truncation = len(name) - max_length + if truncation > 0: + file_root = file_root[:-truncation] + # Entire file_root was truncated in attempt to find an available filename. + if not file_root: + raise SuspiciousFileOperation( + 'Storage can not find an available filename for "%s". ' + "Please make sure that the corresponding file field " + 'allows sufficient "max_length".' % name + ) + name = name if self.file_overwrite else _gen_random_name(file_root) + return name diff --git a/apps/core/files/storage.py b/apps/core/files/storage.py new file mode 100644 index 000000000..274fa7982 --- /dev/null +++ b/apps/core/files/storage.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os +from typing import Callable, Dict + +from bkstorages.backends import bkrepo +from django.conf import settings +from django.core.files.storage import FileSystemStorage, Storage, get_storage_class +from django.utils.deconstruct import deconstructible +from django.utils.functional import cached_property + +from apps.core.files.base import StorageFileOverwriteMixin + + +@deconstructible +class CustomBKRepoStorage(StorageFileOverwriteMixin, bkrepo.BKRepoStorage): + + location = getattr(settings, "BKREPO_LOCATION", "") + file_overwrite = getattr(settings, "FILE_OVERWRITE", False) + + endpoint_url = settings.BKREPO_ENDPOINT_URL + username = settings.BKREPO_USERNAME + password = settings.BKREPO_PASSWORD + project_id = settings.BKREPO_PROJECT + bucket = settings.BKREPO_BUCKET + + def __init__( + self, + root_path=None, + username=None, + password=None, + project_id=None, + bucket=None, + endpoint_url=None, + file_overwrite=None, + ): + # 类成员变量应该和构造函数解耦,通过 params or default 的形式给构造参数赋值,防止该类被继承扩展时需要覆盖全部的成员默认值 + root_path = root_path or self.location + username = username or self.username + password = password or self.password + project_id = project_id or self.project_id + bucket = bucket or self.bucket + endpoint_url = endpoint_url or self.endpoint_url + file_overwrite = file_overwrite or self.file_overwrite + super().__init__( + root_path=root_path, + username=username, + password=password, + project_id=project_id, + bucket=bucket, + endpoint_url=endpoint_url, + file_overwrite=file_overwrite, + ) + + +@deconstructible +class AdminFileSystemStorage(StorageFileOverwriteMixin, FileSystemStorage): + + safe_class = FileSystemStorage + OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0) + + def __init__( + self, + location=None, + base_url=None, + file_permissions_mode=None, + directory_permissions_mode=None, + file_overwrite=None, + ): + super().__init__( + location=location, + base_url=base_url, + file_permissions_mode=file_permissions_mode, + directory_permissions_mode=directory_permissions_mode, + ) + + if file_overwrite is not None and isinstance(file_overwrite, bool): + self.file_overwrite = file_overwrite + + # 如果文件允许覆盖,去掉 O_CREAT 配置,存在文件打开时不报错 + if self.file_overwrite: + self.OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | getattr(os, "O_BINARY", 0) + + # 重写 path,将 safe_join 替换为 os.path.join,从而满足往「项目根路径」外读写的需求 + # safe_join 仅允许项目根目录以内的读写,具体参考 -> django.utils._os safe_join + # 本项目的读写控制不存在用户行为,保留safe_mode成员变量,便于切换 + def path(self, name): + return os.path.join(self.location, name) + + @cached_property + def location(self): + """路径指向 / ,重写前路径指向「项目根目录」""" + return self.base_location + + +# 缓存最基础的Storage +_STORAGE_OBJ_CACHE: [str, Storage] = {} + + +def cache_storage_obj(get_storage_func: Callable[[str, Dict], Storage]): + """用于Storage 缓存读写的装饰器""" + + def inner(storage_type: str = settings.STORAGE_TYPE, *args, **construct_params) -> Storage: + # 仅默认参数情况下返回缓存 + if not (construct_params or args) and storage_type in _STORAGE_OBJ_CACHE: + return _STORAGE_OBJ_CACHE[storage_type] + + storage_obj = get_storage_func(storage_type, *args, **construct_params) + + # 仅默认参数情况下写入缓存 + if not (construct_params or args): + _STORAGE_OBJ_CACHE[storage_type] = storage_obj + + return storage_obj + + return inner + + +@cache_storage_obj +def get_storage(storage_type: str = settings.STORAGE_TYPE, safe: bool = False, **construct_params) -> Storage: + """ + 获取 Storage + :param storage_type: 文件存储类型,参考 constants.StorageType + :param safe: 是否启用安全访问,当前项目不存在用户直接指定上传路径的情况,该字段使用默认值即可 + :param construct_params: storage class 构造参数,用于修改storage某些默认行为(写入仓库、base url等) + :return: Storage实例 + """ + storage_import_path = settings.STORAGE_TYPE_IMPORT_PATH_MAP.get(storage_type) + if storage_import_path is None: + raise ValueError(f"please provide valid storage_type {settings.STORAGE_TYPE_IMPORT_PATH_MAP.values()}") + storage_class = get_storage_class(import_path=storage_import_path) + + if safe: + if not hasattr(storage_class, "safe_class"): + raise ValueError(f"please add safe_class to {storage_class.__name__}") + return storage_class.safe_class(**construct_params) + return storage_class(**construct_params) diff --git a/apps/generic.py b/apps/generic.py index 581ebc872..09f9d9257 100644 --- a/apps/generic.py +++ b/apps/generic.py @@ -139,9 +139,17 @@ def custom_exception_handler(exc, context): """ logger.exception(getattr(exc, "message", exc)) request = context["request"] + + if request.method == "GET": + request_params = request.query_params + else: + if "multipart/form-data" in request.headers.get("Content-Type", ""): + request_params = {"files": str(getattr(request, "FILES"))} + else: + request_params = request.data + logger.error( - """捕获未处理异常, 请求URL->[%s], 请求方法->[%s] 请求参数->[%s]""" - % (request.path, request.method, json.dumps(request.query_params if request.method == "GET" else request.data)) + """捕获未处理异常, 请求URL->[%s], 请求方法->[%s] 请求参数->[%s]""" % (request.path, request.method, json.dumps(request_params)) ) # 专门处理 404 异常,直接返回前端,前端处理 if isinstance(exc, Http404): diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index 9ae5b2e71..b3efdc1a5 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -14,6 +14,7 @@ import os import re from enum import Enum +from typing import List from django.conf import settings from django.utils.translation import ugettext_lazy as _ @@ -25,16 +26,23 @@ reverse_dict, tuple_choices, ) +from config.default import StorageType # 此值为历史遗留,后续蓝鲸不使用此字段后可废弃 DEFAULT_SUPPLIER_ID = 0 -LINUX_SEP = "/" -WINDOWS_SEP = "\\" - ######################################################################################################## # 任务超时控制 ######################################################################################################## + + +class TimeUnit: + SECOND = 1 + MINUTE = SECOND * 60 + HOUR = MINUTE * 60 + DAY = HOUR * 24 + + TASK_TIMEOUT = 0 # 脚本超时控制在180s=3min TASK_MAX_TIMEOUT = 3600 # 脚本超时控制在180s=3min JOB_MAX_RETRY = 60 # 默认轮询作业最大次数 100次=3min @@ -56,6 +64,35 @@ CC_HOST_FIELDS = ["bk_host_id", "bk_cloud_id", "bk_host_innerip", "bk_host_outerip", "bk_os_type", "bk_os_name"] + +######################################################################################################## +# 字符串常量 +######################################################################################################## + +LINUX_SEP = "/" + +WINDOWS_SEP = "\\" + +# 临时文件存放位置 +TMP_DIR = "/tmp" + +# 临时文件名格式模板 +TMP_FILE_NAME_FORMAT = "nm_tf_{name}" + + +class PluginExternalTypePrefix(Enum): + EXTERNAL = "external_plugins" + NORMAL = "plugins" + + @classmethod + def get_optional_items(cls) -> List[str]: + return [cls.EXTERNAL.value, cls.NORMAL.value] + + +PACKAGE_PATH_RE = re.compile( + "(?Pexternal_)?plugins_(?P(linux|windows|aix))_(?P(x86_64|x86|powerpc|aarch64))" +) + ######################################################################################################## # CHOICES ######################################################################################################## @@ -441,10 +478,6 @@ def get_choices(cls): JOB_IP_STATUS_CHOICES = tuple_choices(JOB_IP_STATUS_TUPLE) JobIpStatusType = choices_to_namedtuple(JOB_IP_STATUS_CHOICES) -PACKAGE_PATH_RE = re.compile( - "(?Pexternal_)?plugins_(?P(linux|windows|aix))_(?P(x86_64|x86|powerpc|aarch64))" -) - SYNC_CMDB_HOST_BIZ_BLACKLIST = "SYNC_CMDB_HOST_BIZ_BLACKLIST" # 周期任务相关 @@ -665,8 +698,10 @@ class PolicyRollBackType: ROLLBACK_TYPE__ALIAS_MAP = {SUPPRESSED: "已被其他策略管控", LOSE_CONTROL: "脱离策略管控", TRANSFER_TO_ANOTHER: "转移到优先级最高的策略"} -class TimeUnit: - SECOND = 1 - MINUTE = SECOND * 60 - HOUR = MINUTE * 60 - DAY = HOUR * 24 +class CosBucketEnum(Enum): + PUBLIC = "public" + + PRIVATE = "private" + + +COS_TYPES = [StorageType.BKREPO.value] diff --git a/apps/node_man/exceptions.py b/apps/node_man/exceptions.py index 0cb244853..89f9c577f 100644 --- a/apps/node_man/exceptions.py +++ b/apps/node_man/exceptions.py @@ -202,3 +202,9 @@ class ApNotSupportOsError(NodeManBaseException): class PolicyIsRunningError(NodeManBaseException): MESSAGE_TPL = _("策略 -> {policy_id}「{name}」正在执行") ERROR_CODE = 36 + + +class PluginUploadError(NodeManBaseException): + MESSAGE = _("插件上传失败") + MESSAGE_TPL = _("插件上传失败: plugin_name -> {plugin_name}, error -> {error}") + ERROR_CODE = 37 diff --git a/apps/node_man/handlers/plugin_v2.py b/apps/node_man/handlers/plugin_v2.py index 0922d6e0b..606fd331d 100644 --- a/apps/node_man/handlers/plugin_v2.py +++ b/apps/node_man/handlers/plugin_v2.py @@ -11,16 +11,16 @@ import json import os import random -import shutil -import uuid from collections import ChainMap, defaultdict from typing import Any, Dict, List import requests from django.conf import settings from django.core.cache import cache +from django.core.files.uploadedfile import InMemoryUploadedFile from django.utils.translation import ugettext_lazy as _ +from apps.core.files.storage import get_storage from apps.node_man import constants, exceptions, models, tools from apps.node_man.constants import DEFAULT_CLOUD_NAME, IamActionType from apps.node_man.handlers.cmdb import CmdbHandler @@ -28,27 +28,70 @@ from apps.node_man.handlers.iam import IamHandler from apps.utils.basic import distinct_dict_list, list_slice from apps.utils.concurrent import batch_call +from apps.utils.files import md5sum from apps.utils.local import get_request_username from common.api import NodeApi class PluginV2Handler: @staticmethod - def upload(package_file, module, username): - tmp_dir = os.path.join("/tmp/", uuid.uuid4().hex) - os.mkdir(tmp_dir) - tmp_path = os.path.join(tmp_dir, package_file.name) - with open(tmp_path, "wb") as tmp_file: - for chunk in package_file.chunks(): - tmp_file.write(chunk) - md5 = tools.PluginV2Tools.get_file_md5(tmp_path) - with open(tmp_path, "rb") as tf: - response = requests.post( - url=settings.BKAPP_NODEMAN_UPLOAD_URL, - data={"bk_app_code": settings.APP_CODE, "bk_username": username, "module": module, "md5": md5}, - files={"package_file": tf}, - ) - shutil.rmtree(tmp_dir) + def upload(package_file: InMemoryUploadedFile, module: str) -> Dict[str, Any]: + """ + 将文件上传至 + :param package_file: InMemoryUploadedFile + :param module: 所属模块 + :return: + { + "result": True, + "message": "", + "code": "00", + "data": { + "id": record.id, # 上传文件记录ID + "name": record.file_name, # 包名 + "pkg_size": record.file_size, # 大小, + } + } + """ + with package_file.open("rb") as tf: + + # 计算上传文件的md5 + md5 = md5sum(file_obj=tf, closed=False) + + # 构造通用参数 + upload_params = { + "url": settings.DEFAULT_FILE_UPLOAD_API, + "data": { + "bk_app_code": settings.APP_CODE, + "bk_username": get_request_username(), + "module": module, + "md5": md5, + }, + } + + # 如果采用对象存储,文件直接上传至仓库,并将返回的目标路径传到后台,由后台进行校验并创建上传记录 + # TODO 后续应该由前端上传文件并提供md5 + if settings.STORAGE_TYPE in constants.COS_TYPES: + storage = get_storage() + + try: + storage_path = storage.save(name=os.path.join(settings.UPLOAD_PATH, tf.name), content=tf) + except Exception as e: + raise exceptions.PluginUploadError(plugin_name=tf.name, error=e) + + upload_params["data"].update( + { + # 最初文件上传的名称,后台会使用该文件名保存并覆盖同名文件 + "file_name": tf.name, + "file_path": storage_path, + "download_url": storage.url(storage_path), + } + ) + else: + # 本地文件系统仍通过上传文件到Nginx并回调后台 + upload_params["files"] = {"package_file": tf} + + response = requests.post(**upload_params) + return json.loads(response.content) @staticmethod diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 5dda547c7..c5b084e80 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -10,7 +10,6 @@ """ import base64 import copy -import errno import hashlib import json import os @@ -22,6 +21,7 @@ import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from distutils.dir_util import copy_tree from enum import Enum from functools import cmp_to_key from typing import Any, Dict, List, Optional, Set, Union @@ -45,6 +45,7 @@ from apps.backend.subscription.errors import PipelineExecuteFailed, SubscriptionNotExist from apps.backend.subscription.render_functions import get_hosts_by_node from apps.backend.utils.data_renderer import nested_render_data +from apps.core.files.storage import get_storage from apps.exceptions import ValidationError from apps.node_man import constants as const from apps.node_man.exceptions import ( @@ -53,8 +54,7 @@ CreateRecordError, QueryGlobalSettingsException, ) -from apps.utils import env, orm -from apps.utils.basic import md5 +from apps.utils import env, files, orm from common.log import logger from pipeline.parser import PipelineParser from pipeline.service import task_service @@ -912,8 +912,9 @@ def create_record( """ 给定一个插件的路径,分析路径下的project.yaml,生成压缩包到nginx(多台)目录下 !!!注意:该任务可能会导致长期的卡顿,请务必注意不要再wsgi等单线程环境中调用!!! - :param dir_path: 需要进行打包的插件路径, 例如,plugin_a路径,路径下放置了插件各个文件 - :param package_os: 插件包支持的系统 + :param dir_path: 需要进行打包的插件包临时解压路径, 例如,plugin_a 路径,路径下放置了插件包各个文件 + ⚠️ 该路径应为本地临时路径,插件包已从存储源下载到该路径 + :param package_os: 插件包支持的操作系统类型 :param cpu_arch: 插件支持的CPU架构 :param is_external: 是否第三方插件 :param creator: 操作人 @@ -923,19 +924,23 @@ def create_record( :return: True | raise Exception """ # 1. 判断是否存在project.yaml文件 - project_file_path = os.path.join(dir_path, "project.yaml") - if not os.path.exists(project_file_path): - logger.error("try to pack path->[%s] but is not [project.yaml] file under file path" % dir_path) - raise CreateRecordError(_("找不到 {} project.yaml文件,打包失败").format(dir_path)) + project_yaml_file_path = os.path.join(dir_path, "project.yaml") + if not os.path.exists(project_yaml_file_path): + logger.error( + "try to pack path-> {dir_path} but is not [project.yaml] file under file path".format(dir_path=dir_path) + ) + raise CreateRecordError(_("找不到 {dir_path} project.yaml文件,打包失败").format(dir_path=dir_path)) # 2. 解析project.yaml文件(版本,插件名等信息) try: - with open(project_file_path, "r", encoding="utf-8") as project_file: + with open(project_yaml_file_path, "r", encoding="utf-8") as project_file: yaml_config = yaml.safe_load(project_file) except (IOError, yaml.YAMLError) as error: logger.error( - "failed to parse or read project_yaml->[{}] for->[{}]".format(project_file_path, traceback.format_exc()) + "failed to parse or read project_yaml -> {project_yaml_file_path}, err_msg -> {err_msg}".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) ) six.raise_from(error, error) @@ -943,37 +948,49 @@ def create_record( # 解析版本号转为字符串,防止x.x情况被解析为浮点型,同时便于后续写入及比较 yaml_config["version"] = str(yaml_config["version"]) - package_name = yaml_config["name"] + project = yaml_config["name"] version = yaml_config["version"] control_info = yaml_config.get("control", {}) except KeyError as error: logger.error( - "failed to get key info from project.yaml->[%s] for->[%s] maybe config file error?" - % (project_file_path, traceback.format_exc()) + "failed to get key info from project.yaml -> {project_yaml_file_path}, " + "err_msg -> {err_msg} maybe config file error?".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) + ) + raise CreateRecordError( + _("配置文件 -> {project_yaml_file_path} 信息缺失,请确认后重试, 缺失字段 -> {err_msg}").format( + project_yaml_file_path=project_yaml_file_path, err_msg=error + ) ) - raise CreateRecordError(_("配置文件{}信息缺失,请确认后重试, 缺失字段: {}".format(project_file_path, error))) # 判断之前是否已经有发布过的该插件版本 - exists_object_list = cls.objects.filter(project=package_name, version=version, os=package_os, cpu_arch=cpu_arch) - if exists_object_list.filter(is_release_version=True).exists(): - logger.error( - "project->[%s] version->[%s] os->[%s] cpu_arch->[%s] is release, no more operations is " - "allowed." % (package_name, version, package_os, cpu_arch) + packages_queryset = cls.objects.filter(project=project, version=version, os=package_os, cpu_arch=cpu_arch) + if packages_queryset.filter(is_release_version=True).exists(): + logger.warning( + "project -> {project} version -> {version} os -> {os} cpu_arch -> {cpu_arch} is release, " + "will overwrite it".format(project=project, version=version, os=package_os, cpu_arch=cpu_arch) ) # 判断插件类型是否符合预期 if yaml_config["category"] not in const.CATEGORY_TUPLE: logger.error( - "project->[%s] version->[%s] update(or create) with category->[%s] which is not acceptable, " - "nothing will do." % (package_name, version, yaml_config["category"]) + "project -> {project} version -> {version} update(or create) with category -> {category} which " + "is not acceptable, nothing will do.".format( + project=project, version=version, category=yaml_config["category"] + ) + ) + raise CreateRecordError( + _("project.yaml中 category 配置有误,可选项 -> {category_tuple},请确认后重试").format( + category_tuple=const.CATEGORY_TUPLE + ) ) - raise ValueError(_("project.yaml中category配置异常,请确认后重试")) # 3. 创建新的插件包信息 # 判断是否已经由插件描述信息,需要写入 desc, created = GsePluginDesc.objects.update_or_create( - name=package_name, + name=project, defaults=dict( description=yaml_config.get("description", ""), scenario=yaml_config.get("scenario", ""), @@ -990,19 +1007,23 @@ def create_record( ), ) if created: - logger.info("desc->[{}] for pack->[{}] is created".format(desc.id, package_name)) + logger.info( + "plugin_desc_id -> {plugin_desc_id} for project -> {project} is created".format( + plugin_desc_id=desc.id, project=project + ) + ) # 写入插件包信息 - file_name = "{}-{}.tgz".format(package_name, version) - if not exists_object_list.exists(): + pkg_name = f"{project}-{version}.tgz" + if not packages_queryset.exists(): # 如果之前未有未发布的插件包信息,需要新建 record = cls.objects.create( - pkg_name=file_name, + pkg_name=pkg_name, version=version, module="gse_plugin", # TODO: 留坑 creator=creator if creator is not None else settings.SYSTEM_USE_API_ACCOUNT, - project=package_name, + project=project, pkg_size=0, pkg_path="", md5="", @@ -1016,16 +1037,14 @@ def create_record( ) else: # 否则,更新已有的记录即可 - record = exists_object_list[0] + record = packages_queryset[0] - path_info = env.get_gse_env_path(package_name, is_windows=(package_os == "windows")) + path_info = env.get_gse_env_path(project, is_windows=(package_os == "windows")) try: proc_control = ProcControl.objects.get(plugin_package_id=record.id) except ProcControl.DoesNotExist: - proc_control = ProcControl.objects.create( - module="gse_plugin", project=package_name, plugin_package_id=record.id - ) + proc_control = ProcControl.objects.create(module="gse_plugin", project=project, plugin_package_id=record.id) # 判断是否需要更新配置文件模板 if is_template_load: @@ -1042,37 +1061,40 @@ def create_record( template_file_path = os.path.join(dir_path, source_path) if not os.path.exists(template_file_path): logger.error( - "project.yaml need to import file->[%s] but is not exists, nothing will do." - % templates_info["source_path"] + "project.yaml need to import file -> {source_path} but is not exists, nothing will do.".format( + source_path=templates_info["source_path"] + ) + ) + raise CreateRecordError(_("找不到需要导入的配置模板文件 -> {source_path}").format(source_path=source_path)) + + with open(template_file_path) as template_fs: + config_template_obj, __ = PluginConfigTemplate.objects.update_or_create( + plugin_name=record.project, + plugin_version=templates_info["plugin_version"], + name=templates_info["name"], + version=templates_info["version"], + is_main=is_main_config, + defaults=dict( + format=templates_info["format"], + file_path=templates_info["file_path"], + content=template_fs.read(), + is_release_version=is_release, + creator="system", + create_time=timezone.now(), + source_app_code="bk_nodeman", + ), ) - raise IOError(_("找不到需要导入的配置模板文件[%s]") % source_path) - - template, created = PluginConfigTemplate.objects.update_or_create( - plugin_name=record.project, - plugin_version=templates_info["plugin_version"], - name=templates_info["name"], - version=templates_info["version"], - is_main=is_main_config, - defaults=dict( - format=templates_info["format"], - file_path=templates_info["file_path"], - content=open(template_file_path).read(), - is_release_version=is_release, - creator="system", - create_time=timezone.now(), - source_app_code="bk_nodeman", - ), - ) logger.info( - "template->[%s] version->[%s] is create for plugin->[%s] version->[%s] is add" - % (template.name, template.version, record.project, record.version) + "template -> {name} template_version -> {template_version} is create for plugin -> {project} " + "version -> {version}".format( + name=config_template_obj.name, + template_version=config_template_obj.version, + project=record.project, + version=record.version, + ) ) - # 由于文件已经进入到了数据库中,此时需要清理tpl文件 - os.remove(template_file_path) - logger.info("template->[%s] now is delete for info has loaded into database." % template_file_path) - # 更新信息 proc_control.install_path = path_info["install_path"] proc_control.log_path = path_info["log_path"] @@ -1103,61 +1125,72 @@ def create_record( proc_control.port_range = port_range proc_control.save() + logger.info( - "process control->[%s] for package->[%s] version->[%s] os->[%s] is created." - % (proc_control.id, package_name, version, package_os) + "process control -> {id} for plugin -> {project} version -> {version} os -> {os} is created.".format( + id=proc_control.id, project=project, version=version, os=package_os + ) ) # 4. 打包创建新的tar包 - file_name = "{}-{}.tgz".format(package_name, version) - temp_file_path = "/tmp/{}-{}-{}-{}.tgz".format(package_name, version, package_os, cpu_arch) - nginx_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, record.os, record.cpu_arch, file_name) - - try: - # 尝试创建 Nginx download path,已存在则忽略 - os.makedirs(os.path.dirname(nginx_path)) - except OSError as e: - if e.errno != errno.EEXIST: - raise e - - with tarfile.open(temp_file_path, "w:gz") as tfile: - tfile.add( + temp_file_path = os.path.join(const.TMP_DIR, f"{project}-{version}-{package_os}-{cpu_arch}.tgz") + with tarfile.open(temp_file_path, "w:gz") as tf: + tf.add( dir_path, # 判断是否第三方插件的路径 - arcname="external_plugins/%s" % package_name if is_external else "plugins/", + arcname=f"external_plugins/{project}" if is_external else f"plugins/{project}", ) logger.info( - "package->[%s] version->[%s] now is pack to temp_path->[%s], ready to send to nginx." - % (package_name, version, file_name) + "project -> {project} version -> {version} now is pack to temp_file_path -> {temp_file_path}".format( + project=project, version=version, temp_file_path=temp_file_path + ) ) - # 4. 文件SCP转移到nginx路径下 - # 注意:此处需要依赖 NGINX_DOWNLOAD_PATH 挂载到 NFS - shutil.copy(temp_file_path, nginx_path) + # 4. 插件包保存 + package_target_path = os.path.join(settings.DOWNLOAD_PATH, record.os, record.cpu_arch, pkg_name) + with open(temp_file_path, mode="rb") as tf: + # 采用同名覆盖策略,保证同版本插件包仅保存一份 + storage_path = get_storage(file_overwrite=True).save(package_target_path, tf) + if storage_path != package_target_path: + logger.error( + "package save error, except save to -> {package_target_path}, but -> {storage_path}".format( + package_target_path=package_target_path, storage_path=storage_path + ) + ) + raise CreateRecordError( + _("插件包保存错误,期望保存到 -> {package_target_path}, 实际保存到 -> {storage_path}").format( + package_target_path=package_target_path, storage_path=storage_path + ) + ) # 5. 标记已经完成同步及其他信息 record.is_ready = True - record.pkg_ctime = record.pkg_mtime = str(timezone.now()) + record.pkg_mtime = str(timezone.now()) + # pkg_ctime 仅记录该插件包信息的创建时间 + record.pkg_ctime = record.pkg_ctime or record.pkg_mtime record.pkg_size = os.path.getsize(temp_file_path) - record.pkg_path = os.path.dirname(nginx_path) - record.md5 = md5(temp_file_path) + record.pkg_path = os.path.dirname(package_target_path) + record.md5 = files.md5sum(name=temp_file_path) # 这里没有加上包名,是因为原本脚本(bkee/bkce)中就没有加上,为了防止已有逻辑异常,保持一致 # 后面有哪位发现这里不适用了,可以一并修改 - record.location = "http://{}/download/{}/{}".format(os.getenv("LAN_IP"), package_os, cpu_arch) + record.location = f"http://{os.getenv('LAN_IP')}/download/{package_os}/{cpu_arch}" record.save() + logger.info( - "plugin->[{}] version->[{}] now is sync to nginx ready to use.".format(record.project, record.version) + "package -> {pkg_name}, package_target_path -> {package_target_path} now is ready to use".format( + pkg_name=pkg_name, package_target_path=package_target_path + ) ) # 清理临时文件 os.remove(temp_file_path) - logger.info("clean temp tgz file -> [{temp_file_path}] done.".format(temp_file_path=temp_file_path)) + logger.info("clean temp tgz file -> {temp_file_path} done.".format(temp_file_path=temp_file_path)) return record @classmethod - def export_plugins(cls, project, version, os_type=None, cpu_arch=None): + def export_plugins(cls, project: str, version: str, os_type: str = None, cpu_arch: str = None) -> Dict[str, str]: """ 导出指定插件 !!! 注意:该方法会有打包及同步等高延迟的动作,请勿在同步环境(uwsgi)下使用 !!! @@ -1176,8 +1209,8 @@ def export_plugins(cls, project, version, os_type=None, cpu_arch=None): filter_params["cpu_arch"] = cpu_arch # 1. 确认需要导出的文件 # 注意:未完成发布及nginx准备的插件不可导出 - plugin_list = cls.objects.filter(**filter_params, is_ready=True, is_release_version=True) - if not plugin_list.exists(): + package_objs = cls.objects.filter(**filter_params, is_ready=True, is_release_version=True) + if not package_objs.exists(): logger.error( "user try to export plugin project->[{project}] version->[{version}] " "filter_params->[{filter_params}] but is not exists, nothing will do.".format( @@ -1187,142 +1220,110 @@ def export_plugins(cls, project, version, os_type=None, cpu_arch=None): raise ValueError(_("找不到可导出插件,请确认后重试")) # 临时的解压目录 - temp_path = "/tmp/%s" % uuid.uuid4().hex - # 临时的压缩包路径 - temp_file_path = "/tmp/%s.tgz" % uuid.uuid4().hex + local_unzip_target_dir = files.mk_and_return_tmpdir() + # 暂存导出插件的文件路径 + export_plugin_tmp_path = os.path.join( + const.TMP_DIR, const.TMP_FILE_NAME_FORMAT.format(name=f"{uuid.uuid4().hex}.tgz") + ) # 2. 各个插件解压到指定的目录 - for plugin in plugin_list: - plugin.unzip(temp_path) + for package_obj in package_objs: + package_obj.unzip(local_unzip_target_dir) logger.info( - "plugin->[{}] os->[{}] cpu->[{}] unzip success.".format(plugin.pkg_name, plugin.os, plugin.cpu_arch) + "package -> {pkg_name} os -> {os} cpu -> {cpu_arch} unzip success.".format( + pkg_name=package_obj.pkg_name, os=package_obj.os, cpu_arch=package_obj.cpu_arch + ) ) - # 3. 解压的指定目录打包 - with tarfile.open(temp_file_path, "w:gz") as tar_file: + # 3. 将解压的各个插件包打包成一个完整的插件 + with tarfile.open(export_plugin_tmp_path, "w:gz") as tar_file: # temp_path下的内容由于是从plugin处解压获得,所以应该已经符合external_plugins或者plugins的目录规范 # 此处则不再指定 - tar_file.add(temp_path, ".") + tar_file.add(local_unzip_target_dir, ".") logger.debug( - "export plugin->[%s] version->[%s] create temp_file->[%s] from path->[%s] success, " - "ready to trans to nginx." % (project, version, temp_file_path, temp_path) + "export plugin -> {plugin_name} version -> {version} create export tmp path -> {export_plugin_tmp_path} " + "from path-> {local_unzip_target_dir} success, ready to storage".format( + plugin_name=project, + version=version, + export_plugin_tmp_path=export_plugin_tmp_path, + local_unzip_target_dir=local_unzip_target_dir, + ) ) - # 4. 同步到nginx指定目录 - file_name = "{}-{}-{}.tgz".format(project, version, md5(temp_file_path)) - - if not os.path.exists(settings.EXPORT_PATH): - os.makedirs(settings.EXPORT_PATH) + # 4. 将导出的插件上传到存储源 + plugin_export_target_path = os.path.join( + settings.EXPORT_PATH, f"{project}-{version}-{files.md5sum(name=export_plugin_tmp_path)}.tgz" + ) + with open(export_plugin_tmp_path, mode="rb") as tf: + storage_path = get_storage().save(plugin_export_target_path, tf) - download_file_path = os.path.join(settings.EXPORT_PATH, file_name) - shutil.copy(temp_file_path, download_file_path) logger.info( - "plugin->[{}] version->[{}] export file->[{}] is ready".format(project, version, download_file_path) + "export done: plugin-> {plugin_name} version -> {version} export file -> {storage_path}".format( + plugin_name=project, version=version, storage_path=storage_path + ) ) - logger.info("plugin->[{}] version->[{}] export job success.".format(project, version)) - # 清除临时文件 - shutil.rmtree(temp_path) - os.remove(temp_file_path) + os.remove(export_plugin_tmp_path) + shutil.rmtree(local_unzip_target_dir) + logger.info( - "clean temp tgz file -> [{temp_file_path}], temp path -> [{temp_path}] done.".format( - temp_file_path=temp_file_path, temp_path=temp_path + "plugin -> {plugin_name} version -> {version} export job success.".format( + plugin_name=project, version=version ) ) - return {"file_path": download_file_path} + return {"file_path": storage_path} - def unzip(self, target_path): + def unzip(self, local_target_dir: str) -> None: """ 将一个指定的插件解压到指定的目录下 - :param target_path: 指定的解压目录 + :param local_target_dir: 指定的解压目录 :return: True | raise Exception """ + storage = get_storage() file_path = os.path.join(self.pkg_path, self.pkg_name) - # 1. 获取文件 - if not os.path.exists(file_path): + # 校验插件包是否存在 + if not storage.exists(file_path): logger.error( - "try to unzip package->[{}] but file_path->[{}] is not exists, nothing will do.".format( - self.pkg_name, file_path + "try to unzip package-> {pkg_name} but file_path -> {file_path} is not exists, nothing will do.".format( + pkg_name=self.pkg_name, file_path=file_path ) ) - raise ValueError(_("插件文件不存在,请联系管理员处理")) - - # 2. 解压到指定的目录 - with tarfile.open(file_path) as tar_file: - - file_members = tar_file.getmembers() - - # 判断获取需要解压到的目标位置 - if "external_plugins" in file_members[0].name: - # 第三方插件的导出 - # 目标路径变更为:${target_path}/external_plugins_linux_x86/${project_name}/ - target_path = os.path.join( - target_path, "external_plugins_{}_{}".format(self.os, self.cpu_arch), self.project - ) - logger.info( - "project->[%s] version->[%s] is external_plugins so set target_path->[%s]" - % (self.project, self.version, target_path) - ) - plugin_root_path = "external_plugins/%s/" % self.project - type_root_path = "external_plugins/" - - else: - # 目标路径变更为:${target_path}/plugins_linux_x86/${project_name}/ - target_path = os.path.join(target_path, "plugins_{}_{}".format(self.os, self.cpu_arch), self.project) - logger.info( - "project->[%s] version->[%s] is offical plugins so set target_path->[%s]" - % (self.project, self.version, target_path) - ) - plugin_root_path = "plugins/%s/" % self.project - type_root_path = "plugins/" - - if not os.path.exists(target_path): - os.makedirs(target_path) - logger.info("temp path->[{}] for package->[{}] is created".format(target_path, self.pkg_name)) - - # 对所有的内容进行遍历,然后找到是文件的内容,解压到我们的目标路径上 - for member in file_members: - - # 如果是类型的层级文件夹,跳过 - if member.name == plugin_root_path[:-1] or member.name == type_root_path[:-1]: - logger.info( - "path->[{}] plugin_root_path->[{}] type_root_path->[{}] jump it".format( - member.name, plugin_root_path, type_root_path - ) - ) - continue - - # 解压时,只关注最底层的文件名及文件夹 - # 上层的external_plugins/project_name废弃 - file_name = member.name.replace(plugin_root_path, "") - logger.info( - "path->[{}] is extract to->[{}] with replace_root->[{}]".format( - member.name, file_name, plugin_root_path - ) - ) - current_target_path = os.path.join(target_path, file_name) + raise ValueError(_("插件包不存在,请联系管理员处理")) + + # 将插件包解压到临时目录中 + package_tmp_dir = files.mk_and_return_tmpdir() + # 文件的读取是从指定数据源(NFS或对象存储),可切换源模式,不直接使用原生open + with storage.open(name=file_path, mode="rb") as tf_from_storage: + with tarfile.open(fileobj=tf_from_storage) as tf: + tf.extractall(path=package_tmp_dir) + + # 遍历插件包的一级目录,找出 PluginExternalTypePrefix 匹配的文件夹并加入到指定的解压目录 + # 一般来说,插件包是具体到机器操作系统类型的,所以 package_tmp_dir 下基本只有一个目录 + for external_type_prefix in os.listdir(package_tmp_dir): + if external_type_prefix not in const.PluginExternalTypePrefix.get_optional_items(): + continue + # 将匹配的目录拷贝并格式化命名 + # 关于拷贝目录,参考:https://stackoverflow.com/questions/1868714/ + copy_tree( + src=os.path.join(package_tmp_dir, external_type_prefix), + dst=os.path.join(local_target_dir, f"{external_type_prefix}_{self.os}_{self.cpu_arch}"), + ) - # 此处使用私有方法,是因为改名没有其他方式了 - # 如果其他大锅有更好的方案,欢迎修改。。。囧 - tar_file._extract_member(member, current_target_path) - logger.info( - "project->[%s] version->[%s] file->[%s] is extract to->[%s]" - % (self.project, self.version, member.name, current_target_path) - ) + # 移除临时解压目录 + shutil.rmtree(package_tmp_dir) logger.info( - "package->[{}] os->[{}] cpu->[{}] unzip to path->[{}] success.".format( - self.pkg_name, self.os, self.cpu_arch, target_path + "package-> {pkg_name} os -> {os} cpu_arch -> {cpu_arch} unzip to " + "path -> {local_target_dir} success.".format( + pkg_name=self.pkg_name, os=self.os, cpu_arch=self.cpu_arch, local_target_dir=local_target_dir ) ) - return True - class Meta: verbose_name = _("模块/工程安装包信息表") verbose_name_plural = _("模块/工程安装包信息表") @@ -1465,210 +1466,69 @@ def create_record(cls, module, file_path, md5, operator, source_app_code, file_n """ 创建一个新的上传记录 :param module: 文件模块 - :param file_path: 文件在机器上的本地路径 + :param file_path: 文件源路径 :param md5: 文件MD5 :param operator: 操作者 :param source_app_code: 上传来源APP_CODE - :param file_name: 文件上传前的名字 + :param file_name: 期望的文件保存名,在非文件覆盖的情况下,该名称不是文件最终的保存名 :param is_file_copy: 是否复制而非剪切文件,适应初始化内置插件需要使用 :return: upload record """ # 注意:MD5参数值将会直接使用,因为服务器上的MD5是由nginx协助计算,应该在views限制 - # 1. 判断文件是否已经存在 - if not os.path.exists(file_path): - logger.warning( - "user->[{}] try to create record for file->[{}] but is not exists.".format(operator, file_path) - ) - raise CreateRecordError(_("文件{file_path}不存在,请确认后重试").format(file_path=file_path)) + try: + storage = get_storage() + # 判断文件是否已经存在 + if not storage.exists(file_path): + logger.warning( + "operator -> {operator} try to create record for file -> {file_path} but is not exists.".format( + operator=operator, file_path=file_path + ) + ) + raise CreateRecordError(_("文件{file_path}不存在,请确认后重试").format(file_path=file_path)) - # 判断上传文件的路径是否已经存在 - if not os.path.exists(settings.UPLOAD_PATH): - os.makedirs(settings.UPLOAD_PATH) - logger.info("path->[{}] is not exists, and now is created by us.".format(settings.UPLOAD_PATH)) + target_file_path = os.path.join(settings.UPLOAD_PATH, file_name) - # 3. 文件迁移到public - new_file_path = os.path.join(settings.UPLOAD_PATH, file_name) + # 如果读写路径一致无需拷贝文件,此处 target_file_path / file_path 属于同个文件源 + if target_file_path != file_path: + with storage.open(name=file_path, mode="rb") as fs: + # 不允许覆盖同名文件的情况下,文件名会添加随机串,此时 target_file_path / file_name 应刷新 + target_file_path = storage.save(name=target_file_path, content=fs) + file_name = os.path.basename(target_file_path) - try: - if is_file_copy: - shutil.copy(file_path, new_file_path) - else: - shutil.move(file_path, new_file_path) - except IOError: + # 如果是通过 mv 拷贝到指定目录,此时原文件应该删除 + if not is_file_copy: + storage.delete(file_path) + + record = cls.objects.create( + file_name=file_name, + module=module, + file_path=target_file_path, + file_size=storage.size(target_file_path), + md5=md5, + upload_time=timezone.now(), + creator=operator, + source_app_code=source_app_code, + ) + + except Exception: logger.error( - "failed to mv source_file->[%s] to targe_path->[%s] for->[%s]" - % (file_path, new_file_path, traceback.format_exc()) + "failed to mv source_file -> {file_path} to target_file_path -> {target_file_path}, " + "err_msg -> {err_msg}".format( + file_path=file_path, target_file_path=target_file_path, err_msg=traceback.format_exc() + ) ) raise CreateRecordError(_("文件迁移失败,请联系管理员协助处理")) - record = cls.objects.create( - file_name=file_name, - module=module, - file_path=new_file_path, - file_size=os.path.getsize(new_file_path), - md5=md5, - upload_time=timezone.now(), - creator=operator, - source_app_code=source_app_code, - ) logger.info( - "new record for file->[%s] module->[%s] is added by operator->[%s] from system->[%s]." - % (file_path, module, operator, source_app_code) + "new record for file -> {file_path} module -> {module} is added by operator -> {operator} " + "from system -> {source_app_code}.".format( + file_path=file_path, module=module, operator=operator, source_app_code=source_app_code + ) ) return record - def create_package_records( - self, is_release, creator=None, select_pkg_abs_paths=None, is_template_load=False, is_template_overwrite=False - ): - """ - 拆解一个上传包并将里面的插件录入到package表中 - :param is_release: 是否正式发布 - :param creator: 操作人 - :param select_pkg_abs_paths: 指定注册包名列表 - :param is_template_load: 是否需要读取配置文件 - :param is_template_overwrite: 是否可以覆盖已经存在的配置文件 - :return: [package_object, ...] - """ - # 1. 解压压缩文件 - package_result = [] - temp_path = "/tmp/%s" % uuid.uuid4().hex - - with tarfile.open(self.file_path) as tfile: - # 检查是否存在可疑内容 - for file_info in tfile.getmembers(): - if file_info.name.startswith("/") or "../" in file_info.name: - logger.error( - "WTF? file->[{}] contains member->[{}] try to escape! We won't use it.".format( - self.file_path, file_info.name - ) - ) - raise CreateRecordError(_("文件包含非法路径成员[{name}],请检查").format(name=file_info.name)) - - logger.info("file->[{}] extract to path->[{}] success.".format(self.file_path, temp_path)) - tfile.extractall(path=temp_path) - - # 2. 遍历第一层的内容,得知当前的操作系统和cpu架构信息 - with transaction.atomic(): - for first_path in os.listdir(temp_path): - re_match = const.PACKAGE_PATH_RE.match(first_path) - if re_match is None: - logger.info("path->[%s] is not match re, jump it." % first_path) - continue - - path_dict = re_match.groupdict() - current_os = path_dict["os"] - cpu_arch = path_dict["cpu_arch"] - logger.info("path->[{}] is match for os->[{}] cpu->[{}]".format(first_path, current_os, cpu_arch)) - - # 遍历第二层的内容,得知当前的插件名 - abs_first_path = os.path.join(temp_path, first_path) - for second_path in os.listdir(abs_first_path): - # 注册新的内容,并触发同步 - # second_path 是包名 - abs_path = os.path.join(abs_first_path, second_path) - - if not os.path.isdir(abs_path): - logger.info("found file path->[%s] jump it" % abs_path) - continue - if select_pkg_abs_paths is not None and f"{first_path}/{second_path}" not in select_pkg_abs_paths: - logger.info("path->[%s] not select, jump it" % abs_path) - continue - record = Packages.create_record( - dir_path=abs_path, - package_os=current_os, - cpu_arch=cpu_arch, - is_release=is_release, - creator=creator, - is_external=path_dict["is_external"] is not None, - is_template_load=is_template_load, - is_template_overwrite=is_template_overwrite, - ) - - logger.info("package->[{}] now add record->[{}] success.".format(self.file_name, record.id)) - package_result.append(record) - - # 3. 完成 - logger.info("now package->[%s] is all add done." % self.file_name) - - # 4. 清理临时文件夹 - shutil.rmtree(temp_path) - logger.info("clean temp path -> [{temp_path}] done.".format(temp_path=temp_path)) - return package_result - - def list_package_infos(self): - """ - 解析`self.file_path`下插件,获取包信息字典列表 - :return: [ - { - # 插件包的相对路径 - "pkg_abs_path": "plugins_linux_x86_64/package_name" - # 插件包目录路径 - "dir_path": "/tmp/12134/plugins_linux_x86_64/package_name", - # 插件所需操作系统 - "package_os": "linux", - # 支持cpu位数 - "cpu_arch": "x86_64", - # 是否为自定义插件 - "is_external": "False" - }, - ... - ] - """ - # 1. 解压压缩文件 - temp_path = "/tmp/%s" % uuid.uuid4().hex - with tarfile.open(self.file_path) as tfile: - # 检查是否存在可疑内容 - for file_info in tfile.getmembers(): - if file_info.name.startswith("/") or "../" in file_info.name: - logger.error( - "WTF? file->[{}] contains member->[{}] try to escape! We won't use it.".format( - self.file_path, file_info.name - ) - ) - raise ValueError(_("文件包含非法路径成员[%s],请检查") % file_info.name) - logger.info("file->[{}] extract to path->[{}] success.".format(self.file_path, temp_path)) - tfile.extractall(path=temp_path) - - package_infos = [] - # 遍历第一层的内容,获取操作系统和cpu架构信息,eg:external(可无,有表示自定义插件)_plugins_linux_x86_64 - for first_plugin_dir_name in os.listdir(temp_path): - # 通过正则提取出插件(plugin)目录名中的插件信息 - re_match = const.PACKAGE_PATH_RE.match(first_plugin_dir_name) - if re_match is None: - logger.info("path->[%s] is not match re, jump it." % first_plugin_dir_name) - continue - - # 将文件名解析为插件信息字典 - plugin_info_dict = re_match.groupdict() - current_os = plugin_info_dict["os"] - cpu_arch = plugin_info_dict["cpu_arch"] - logger.info( - "path->[{}] is match for os->[{}] cpu->[{}]".format(first_plugin_dir_name, current_os, cpu_arch) - ) - - first_level_plugin_path = os.path.join(temp_path, first_plugin_dir_name) - # 遍历第二层的内容,获取包名, eg:plugins_linux_x86_64/package_name - for second_package_dir_name in os.listdir(first_level_plugin_path): - # 拼接获取包路径 - second_level_package_dir_path = os.path.join(first_level_plugin_path, second_package_dir_name) - if not os.path.isdir(second_level_package_dir_path): - logger.info("found file path->[%s] jump it" % second_level_package_dir_path) - continue - - package_infos.append( - { - "pkg_abs_path": f"{first_plugin_dir_name}/{second_package_dir_name}", - "dir_path": second_level_package_dir_path, - "package_os": current_os, - "cpu_arch": cpu_arch, - "is_external": plugin_info_dict["is_external"] is not None, - } - ) - - return package_infos - class DownloadRecord(models.Model): """ @@ -1740,7 +1600,7 @@ def download_key(self): return md5.hexdigest() @classmethod - def create_record(cls, category, query_params, creator, source_app_code): + def create_record(cls, category: str, query_params: Dict[str, Any], creator: str, source_app_code: str): """ 创建下载任务记录 :param category: 下载文件类型 @@ -1750,13 +1610,6 @@ def create_record(cls, category, query_params, creator, source_app_code): :return: download record """ - if category not in cls.CATEGORY_TASK_DICT: - logger.error( - "user->[%s] from source_app->[%s] request category->[%s] is not supported now, " - "nothing will do." % (creator, source_app_code, category) - ) - raise ValueError(_("请求下载类型[%s]暂不支持,请确认后重试") % category) - record = cls.objects.create( category=category, query_params=json.dumps(query_params), @@ -1768,8 +1621,9 @@ def create_record(cls, category, query_params, creator, source_app_code): source_app_code=source_app_code, ) logger.info( - "download record->[{}] is create from app->[{}] for category->[{}] query_params->[{}]".format( - record.id, source_app_code, category, query_params + "download record -> {record_id} is create from app -> {source_app_code} for category -> {category} " + "query_params -> {query_params}".format( + record_id=record.id, source_app_code=source_app_code, category=category, query_params=query_params ) ) @@ -1794,10 +1648,14 @@ def execute(self): self.file_path = result["file_path"] except Exception as error: - logger.error("failed to execute task->[{}] for->[{}]".format(self.id, traceback.format_exc())) + logger.error( + "failed to execute task -> {record_id} for -> {err_msg}".format( + record_id=self.id, err_msg=traceback.format_exc() + ) + ) task_status = self.TASK_STATUS_FAILED - error_message = _("任务失败: %s") % error + error_message = _("任务失败: {err_msg}").format(err_msg=error) six.raise_from(error, error) @@ -1807,8 +1665,9 @@ def execute(self): self.finish_time = timezone.now() self.save() logger.info( - "task->[%s] is done with status->[%s] error_message->[%s]" - % (self.id, self.task_status, self.error_message) + "task -> {record_id} is done with status -> {task_status} error_message -> {err_msg}".format( + record_id=self.id, task_status=self.task_status, err_msg=self.error_message + ) ) diff --git a/apps/node_man/tests/test_pluginv2.py b/apps/node_man/tests/test_pluginv2.py index 4dc70c7de..c83645ccc 100644 --- a/apps/node_man/tests/test_pluginv2.py +++ b/apps/node_man/tests/test_pluginv2.py @@ -67,30 +67,6 @@ def test_list_plugin(self): }, ) - @patch("apps.node_man.handlers.plugin_v2.requests.post", upload_package_return) - def test_upload(self): - class package_file: - name = "123.txt" - - @classmethod - def chunks(cls): - return [b"test1", b"test2"] - - result = PluginV2Handler().upload(package_file=package_file, module="test_module", username="") - self.assertEqual( - result, - { - "result": True, - "message": "", - "code": "00", - "data": { - "id": 21, # 包上传记录ID - "name": "test-0.01.tgz", # 包名 - "pkg_size": "23412434", # 单位byte - }, - }, - ) - @patch("apps.node_man.handlers.cmdb.CmdbHandler.cmdb_or_cache_biz", cmdb_or_cache_biz) def test_list_plugin_host(self): # 构造数据 diff --git a/apps/node_man/tools/plugin_v2.py b/apps/node_man/tools/plugin_v2.py index d231293b7..f456316f7 100644 --- a/apps/node_man/tools/plugin_v2.py +++ b/apps/node_man/tools/plugin_v2.py @@ -9,7 +9,6 @@ specific language governing permissions and limitations under the License. """ -import hashlib import re import traceback from itertools import groupby @@ -29,18 +28,6 @@ class PluginV2Tools: lower_var_path_pattern = re.compile(r"{{\s*[\w.]+\s*\|\s*lower\s*}}") lower_var_name_pattern = re.compile(r"{{\s*([\w.]+)\s*\|\s*lower\s*}}") - @staticmethod - def get_file_md5(file_name): - hash_md5 = hashlib.md5() - try: - with open(file_name, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - except IOError: - return "-1" - - return hash_md5.hexdigest() - @classmethod def shield_tpl_unparse_content(cls, config_template_content: str): shield_content = config_template_content diff --git a/apps/node_man/views/plugin_v2.py b/apps/node_man/views/plugin_v2.py index 23a27ef04..ef8df9b58 100644 --- a/apps/node_man/views/plugin_v2.py +++ b/apps/node_man/views/plugin_v2.py @@ -458,11 +458,7 @@ def upload(self, request): ser = self.serializer_class(data=request.data) ser.is_valid(raise_exception=True) data = ser.validated_data - return JsonResponse( - PluginV2Handler.upload( - package_file=data["package_file"], module=data["module"], username=get_request_username() - ) - ) + return JsonResponse(PluginV2Handler.upload(package_file=data["package_file"], module=data["module"])) @action(detail=False, methods=["POST"], serializer_class=plugin_v2.PluginFetchConfigVarsSerializer) def fetch_config_variables(self, request): diff --git a/apps/utils/basic.py b/apps/utils/basic.py index 375c02481..89a2da70f 100644 --- a/apps/utils/basic.py +++ b/apps/utils/basic.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import hashlib from collections import Counter, namedtuple from copy import deepcopy from typing import Any, Dict, Iterable, List, Set, Union @@ -72,21 +71,6 @@ def suffix_slash(os, path): return path -def md5(file_name): - """内部实现的平台无关性计算MD5""" - hash = hashlib.md5() - try: - with open(file_name, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - if not chunk: - break - hash.update(chunk) - except IOError: - return "-1" - - return hash.hexdigest() - - def chunk_lists(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): diff --git a/apps/utils/env.py b/apps/utils/env.py index f3cbabef8..bda8ea42c 100644 --- a/apps/utils/env.py +++ b/apps/utils/env.py @@ -11,9 +11,12 @@ import logging import os +from typing import Any from django.conf import settings +from apps.utils.string import str2bool + logger = logging.getLogger("app") """ @@ -74,3 +77,31 @@ def get_gse_env_path(package_name, is_windows=False): "pid_path": settings.GSE_AGENT_RUN_DIR + "/" + package_name + ".pid", "data_path": settings.GSE_AGENT_DATA_DIR, } + + +def get_type_env(key: str, default: Any = None, _type: type = str, exempt_empty_str: bool = False) -> Any: + """ + 获取环境变量并转为目标类型 + :param key: 变量名 + :param default: 默认值,若获取不到环境变量会默认使用该值 + :param _type: 环境变量需要转换的类型,不会转 default + :param exempt_empty_str: 是否豁免空串 + :return: + """ + value = os.getenv(key) or default + if value == default: + return value + + # 豁免空串 + if isinstance(value, str) and not value and exempt_empty_str: + return value + + if _type == bool: + return str2bool(value) + + try: + value = _type(value) + except TypeError: + raise TypeError(f"can not convert env value -> {value} to type -> {_type}") + + return value diff --git a/apps/utils/files.py b/apps/utils/files.py new file mode 100644 index 000000000..b70dda8da --- /dev/null +++ b/apps/utils/files.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import hashlib +import os +import uuid +from typing import IO, Any, Optional + +import requests + +from apps.node_man import constants + + +class file_open: + + closed: bool + # 标记是否通过路径打开 + is_name: bool + file_obj: Optional[IO[Any]] + + def __init__(self, name: str = None, file_obj: Optional[IO[Any]] = None, mode: str = "rb", closed: bool = True): + if not (file_obj or name): + raise ValueError("nothing to open") + + self.closed = closed + # 标记是否通过路径打开 + self.is_name = not file_obj + + if self.is_name: + self.file_obj = open(name, mode=mode) + else: + self.file_obj = file_obj + + def __enter__(self) -> IO[Any]: + return self.file_obj + + def __exit__(self, exc_type, exc_val, exc_tb): + + # 指针复位,避免 closed=False 场景下,影响上层逻辑对该文件对象的复用 + # 参考:https://stackoverflow.com/questions/3906137/why-cant-i-call-read-twice-on-an-open-file + self.file_obj.seek(0) + + # 通过路径open的文件对象必须关闭 + # 传入的文件对象由上层逻辑决定是否显式传入不关闭 + if self.is_name or (not self.is_name and self.closed): + self.file_obj.close() + + +def md5sum(name: str = None, file_obj: Optional[IO[Any]] = None, mode: str = "rb", closed: bool = True) -> str: + """ + 计算文件md5 + :param name: 文件路径 + :param file_obj: 已打开的文件文件对象,同时传 name 和 file_obj 后者优先使用 + :param mode: 文件打开模式,具体参考 open docstring,默认 rb + :param closed: 是否返回时关闭文件对象,安全起见默认关闭 + :return: md5 str or "-1" + """ + + hash_md5 = hashlib.md5() + try: + + with file_open(name=name, file_obj=file_obj, mode=mode, closed=closed) as fs: + for chunk in iter(lambda: fs.read(4096), b""): + if not chunk: + continue + hash_md5.update(chunk) + + except IOError: + return "-1" + + return hash_md5.hexdigest() + + +def download_file( + url: str, + name: str = None, + file_obj: Optional[IO[Any]] = None, + mode: str = "wb", + closed: bool = True, +) -> None: + + """ + 下载文件 + :param url: 下载url + :param name: 写入目标路径 + :param file_obj: 已打开的写入目标文件对象 + :param mode: 文件打开模式,具体参考 open docstring,默认 rb + :param closed: 是否返回时关闭文件对象,安全起见默认关闭 + :return: None + """ + + with requests.get(url=url, stream=True) as rfs: + + rfs.raise_for_status() + + with file_open(name=name, file_obj=file_obj, mode=mode, closed=closed) as local_fs: + for chunk in rfs.iter_content(chunk_size=4096): + if not chunk: + continue + local_fs.write(chunk) + + +def mk_and_return_tmpdir() -> str: + """ + 创建并返回临时目录 + :return: + """ + tmp_dir = os.path.join(constants.TMP_DIR, constants.TMP_FILE_NAME_FORMAT.format(name=uuid.uuid4().hex)) + os.makedirs(tmp_dir, exist_ok=True) + return tmp_dir diff --git a/apps/utils/string.py b/apps/utils/string.py new file mode 100644 index 000000000..aa490e85b --- /dev/null +++ b/apps/utils/string.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from typing import Optional + + +def str2bool(string: Optional[str], strict: bool = True) -> bool: + """ + 字符串转布尔值 + 对于bool(str) 仅在len(str) == 0 or str is None 的情况下为False,为了适配bool("False") 等环境变量取值情况,定义该函数 + 参考:https://stackoverflow.com/questions/21732123/convert-true-false-value-read-from-file-to-boolean + :param string: + :param strict: 严格校验,非 False / True / false / true 时抛出异常,用于环境变量的转换 + :return: + """ + if string in ["False", "false"]: + return False + if string in ["True", "true"]: + return True + + if strict: + raise ValueError(f"{string} can not convert to bool") + return bool(string) diff --git a/apps/utils/tests/test_env.py b/apps/utils/tests/test_env.py new file mode 100644 index 000000000..26665da39 --- /dev/null +++ b/apps/utils/tests/test_env.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os +import uuid + +from apps.utils import env +from apps.utils.unittest.testcase import CustomBaseTestCase + + +class TestEnv(CustomBaseTestCase): + def test_get_type_env(self): + def _test_bool_type(): + _cases = [ + {"key": uuid.uuid4().hex, "value": "True", "except_value": True}, + {"key": uuid.uuid4().hex, "value": "False", "except_value": False}, + {"key": uuid.uuid4().hex, "value": "false", "except_value": False}, + {"key": uuid.uuid4().hex, "value": "true", "except_value": True}, + ] + for _case in _cases: + os.environ[_case["key"]] = _case["value"] + self.assertEqual(env.get_type_env(key=_case["key"], _type=bool), _case["except_value"]) + + os.environ.pop(_case["key"]) + + _not_bool_key = uuid.uuid4().hex + os.environ[_not_bool_key] = "not bool" + self.assertRaises(ValueError, env.get_type_env, key=_not_bool_key, _type=bool) + os.environ.pop(_not_bool_key) + + def _test_int_type(): + _numbers = [0, -1, 1, 1234567890] + for _number in _numbers: + _key = uuid.uuid4().hex + os.environ[_key] = str(_number) + self.assertEqual(env.get_type_env(key=_key, _type=int), _number) + os.environ.pop(_key) + + def _test_str_type(): + _strings = ["1", "2"] + for _string in _strings: + _key = uuid.uuid4().hex + os.environ[_key] = str(_string) + self.assertEqual(env.get_type_env(key=_key, _type=str), _string) + os.environ.pop(_key) + + def _test_get_default(): + _default_values = [10, False, "TYPE"] + for _default_value in _default_values: + self.assertEqual(env.get_type_env(key=uuid.uuid4().hex, default=_default_value), _default_value) + + _test_bool_type() + _test_int_type() + _test_str_type() + _test_get_default() diff --git a/config/default.py b/config/default.py index 1c1ef88c2..05f8ed119 100644 --- a/config/default.py +++ b/config/default.py @@ -9,9 +9,11 @@ specific language governing permissions and limitations under the License. """ import sys +from enum import Enum from blueapps.conf.default_settings import * # noqa +from apps.utils.env import get_type_env from config import ENVIRONMENT # pipeline 配置 @@ -284,6 +286,76 @@ CACHES["default"] = CACHES["db"] + +# ============================================================================== +# 文件存储 +# ============================================================================== + + +class StorageType(Enum): + """文件存储类型""" + + # 本地文件系统 + FILE_SYSTEM = "FILE_SYSTEM" + + # 制品库 + BKREPO = "BKREPO" + + +# 用于控制默认的文件存储类型 +# 更多类型参考 apps.node_man.constants.STORAGE_TYPE +STORAGE_TYPE = os.getenv("STORAGE_TYPE", StorageType.FILE_SYSTEM.value) + +# 是否覆盖同名文件 +FILE_OVERWRITE = get_type_env("FILE_OVERWRITE", _type=bool, default=False) + +# 节点管理后台外网域名,用于构造文件导入导出的API URL +BACKEND_HOST = os.getenv("BKAPP_BACKEND_HOST", "") + +BKREPO_USERNAME = os.getenv("BKREPO_USERNAME") +BKREPO_PASSWORD = os.getenv("BKREPO_PASSWORD") +BKREPO_PROJECT = os.getenv("BKREPO_PROJECT") +# 默认文件存放仓库 +BKREPO_BUCKET = os.getenv("BKREPO_BUCKET") +# 对象存储平台域名 +BKREPO_ENDPOINT_URL = os.getenv("BKREPO_ENDPOINT_URL") + +# 存储类型 - storage class 映射关系 +STORAGE_TYPE_IMPORT_PATH_MAP = { + StorageType.FILE_SYSTEM.value: "apps.core.files.storage.AdminFileSystemStorage", + StorageType.BKREPO.value: "apps.core.files.storage.CustomBKRepoStorage", +} + +# 默认的file storage +DEFAULT_FILE_STORAGE = STORAGE_TYPE_IMPORT_PATH_MAP[STORAGE_TYPE] + +# 本地文件系统上传文件后台API +FILE_SYSTEM_UPLOAD_API = f"{BACKEND_HOST}/backend/package/upload/" + +# 对象存储上传文件后台API +COS_UPLOAD_API = f"{BACKEND_HOST}/backend/package/upload_cos/" + +# 暂时存在多个上传API的原因:原有文件上传接口被Nginx转发 +STORAGE_TYPE_UPLOAD_API_MAP = { + StorageType.FILE_SYSTEM.value: FILE_SYSTEM_UPLOAD_API, + StorageType.BKREPO.value: COS_UPLOAD_API, +} + +DEFAULT_FILE_UPLOAD_API = STORAGE_TYPE_UPLOAD_API_MAP[STORAGE_TYPE] + +BKAPP_NODEMAN_DOWNLOAD_API = f"{BACKEND_HOST}/backend/export/download/" + +PUBLIC_PATH = os.getenv("BKAPP_PUBLIC_PATH") or "/data/bkee/public/bknodeman/" + +# NGINX miniweb路径 +DOWNLOAD_PATH = os.path.join(PUBLIC_PATH, "download") + +# 上传文件的保存位置 +UPLOAD_PATH = os.path.join(PUBLIC_PATH, "upload") + +# 下载文件路径 +EXPORT_PATH = os.path.join(PUBLIC_PATH, "export") + # ============================================================================== # 后台配置 # ============================================================================== @@ -450,32 +522,12 @@ } LOGGING["loggers"]["iam"] = {"handlers": ["iam"], "level": LOGGING["loggers"]["root"]["level"], "propagate": True} -PUBLIC_PATH = os.getenv("BKAPP_PUBLIC_PATH") or "/data/bkee/public/bknodeman/" - -# 上传文件的保存位置 -UPLOAD_PATH = os.path.join(PUBLIC_PATH, "upload") - -# 下载文件路径 -EXPORT_PATH = os.path.join(PUBLIC_PATH, "export") - -# NGINX miniweb路径 -NGINX_DOWNLOAD_PATH = os.path.join(PUBLIC_PATH, "download") - # 节点管理后台 LAN_IP BKAPP_LAN_IP = os.getenv("LAN_IP") # 节点管理后台 NFS_IP BKAPP_NFS_IP = os.getenv("NFS_IP") or BKAPP_LAN_IP -# 节点管理后台外网域名 -# TODO: 需要部署侧提供 -BACKEND_HOST = os.getenv("BKAPP_BACKEND_HOST", "") - -# 文件上传接口 -BKAPP_NODEMAN_UPLOAD_URL = f"{BACKEND_HOST}/backend/package/upload/" - -BKAPP_NODEMAN_DOWNLOAD_URL = f"{BACKEND_HOST}/backend/export/download/" - # 节点管理回调地址 BKAPP_NODEMAN_CALLBACK_URL = os.getenv("BKAPP_NODEMAN_CALLBACK_URL", "") BKAPP_NODEMAN_OUTER_CALLBACK_URL = os.getenv("BKAPP_NODEMAN_OUTER_CALLBACK_URL", "") @@ -500,6 +552,7 @@ VERSION_LOG = {"MD_FILES_DIR": os.path.join(PROJECT_ROOT, "release")} + # remove disabled apps if locals().get("DISABLED_APPS"): INSTALLED_APPS = locals().get("INSTALLED_APPS", []) diff --git a/requirements.txt b/requirements.txt index 6a02a0c1e..ccceb1231 100644 --- a/requirements.txt +++ b/requirements.txt @@ -61,3 +61,5 @@ prettytable==2.1.0 raven==6.1.0 # for apm ddtrace==0.14.1 + +bkstorages==1.0.1