diff --git a/apps/backend/agent/manager.py b/apps/backend/agent/manager.py index b0270e6b5..0e843cf51 100644 --- a/apps/backend/agent/manager.py +++ b/apps/backend/agent/manager.py @@ -358,7 +358,7 @@ def push_files_to_proxy(self, file): type=Var.PLAIN, value=[{"ip": self.host_info["bk_host_innerip"], "bk_cloud_id": self.host_info["bk_cloud_id"]}], ) - act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.NGINX_DOWNLOAD_PATH) + act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.DOWNLOAD_PATH) act.component.inputs.files = Var(type=Var.PLAIN, value=file["files"]) act.component.inputs.from_type = Var(type=Var.PLAIN, value=file.get("from_type", "")) act.component.inputs.context = Var(type=Var.PLAIN, value="") @@ -370,7 +370,7 @@ def start_nginx(self): with open(path, encoding="utf-8") as fh: script = fh.read() script_content = script % { - "nginx_path": settings.NGINX_DOWNLOAD_PATH, + "nginx_path": settings.DOWNLOAD_PATH, "bk_nodeman_nginx_download_port": settings.BK_NODEMAN_NGINX_DOWNLOAD_PORT, "bk_nodeman_nginx_proxy_pass_port": settings.BK_NODEMAN_NGINX_PROXY_PASS_PORT, } @@ -451,14 +451,16 @@ def _operate_process( setup_path = path_handler.join( package.proc_control.install_path, - "external_plugins", + const.PluginChildDir.EXTERNAL.value, group_id, package.project, ) pid_path_prefix, pid_filename = path_handler.split(package.proc_control.pid_path) pid_path = path_handler.join(pid_path_prefix, group_id, pid_filename) else: - setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin") + setup_path = path_handler.join( + package.proc_control.install_path, const.PluginChildDir.OFFICIAL.value, "bin" + ) pid_path = package.proc_control.pid_path act = AgentServiceActivity( diff --git a/apps/backend/agent/tools.py b/apps/backend/agent/tools.py index 882401c67..680502ecb 100644 --- a/apps/backend/agent/tools.py +++ b/apps/backend/agent/tools.py @@ -179,7 +179,7 @@ def gen_commands(host: models.Host, pipeline_id: str, is_uninstall: bool) -> Ins dest_dir = host.agent_config["temp_path"] dest_dir = suffix_slash(host.os_type.lower(), dest_dir) if script_file_name == constants.SetupScriptFileName.SETUP_PAGENT_PY.value: - run_cmd_params.append(f"-L {settings.NGINX_DOWNLOAD_PATH}") + run_cmd_params.append(f"-L {settings.DOWNLOAD_PATH}") # 云区域自动安装 upstream_nodes = [proxy.inner_ip for proxy in host.proxies] host.upstream_nodes = proxies diff --git a/apps/backend/components/collections/agent.py b/apps/backend/components/collections/agent.py index efd02dd3e..f393904bc 100644 --- a/apps/backend/components/collections/agent.py +++ b/apps/backend/components/collections/agent.py @@ -915,7 +915,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() @@ -1269,7 +1269,7 @@ def _execute(self, data, parent_data): else: path_handler = posixpath - setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin") + setup_path = path_handler.join(package.proc_control.install_path, const.PluginChildDir.OFFICIAL.value, "bin") pid_path = package.proc_control.pid_path result = gse_client.register_process(hosts, control, setup_path, pid_path, plugin_name, plugin_name) diff --git a/apps/backend/components/collections/bulk_job.py b/apps/backend/components/collections/bulk_job.py index cbf81c1ef..02915ad9f 100644 --- a/apps/backend/components/collections/bulk_job.py +++ b/apps/backend/components/collections/bulk_job.py @@ -423,7 +423,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = models.Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() diff --git a/apps/backend/components/collections/bulk_job_redis.py b/apps/backend/components/collections/bulk_job_redis.py index f48e18029..9013e9550 100644 --- a/apps/backend/components/collections/bulk_job_redis.py +++ b/apps/backend/components/collections/bulk_job_redis.py @@ -412,7 +412,7 @@ def execute(self, data, parent_data): self.logger.info(_("开始下发升级包")) host_info = data.get_one_of_inputs("host_info") host = models.Host.get_by_host_info(host_info) - nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH + nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH data.inputs.file_target_path = host.agent_config["temp_path"] os_type = host.os_type.lower() diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index 07267dab5..6bf3e1df6 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -573,7 +573,7 @@ def execute(self, data, parent_data): data.inputs.file_source = [ { - "files": [f"{settings.NGINX_DOWNLOAD_PATH}/{file}" for file in files], + "files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files], "account": "root", "ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}], } diff --git a/apps/backend/components/collections/plugin.py b/apps/backend/components/collections/plugin.py index 142cf98cf..e3aaf63c2 100644 --- a/apps/backend/components/collections/plugin.py +++ b/apps/backend/components/collections/plugin.py @@ -277,7 +277,9 @@ def get_plugins_paths( pid_filename = f"{plugin_name}.pid" if package.plugin_desc.category == constants.CategoryType.external: # 如果为 external 第三方插件,需要补上插件组目录 - setup_path = path_handler.join(ap_config["setup_path"], "external_plugins", group_id, package.project) + setup_path = path_handler.join( + ap_config["setup_path"], constants.PluginChildDir.EXTERNAL.value, group_id, package.project + ) if subscription.category == subscription.CategoryType.DEBUG: # debug模式下特殊处理这些路径 @@ -289,7 +291,7 @@ def get_plugins_paths( log_path = path_handler.join(ap_config["log_path"], group_id) data_path = path_handler.join(ap_config["data_path"], group_id) else: - setup_path = path_handler.join(ap_config["setup_path"], "plugins", "bin") + setup_path = path_handler.join(ap_config["setup_path"], constants.PluginChildDir.OFFICIAL.value, "bin") pid_path = path_handler.join(ap_config["run_path"], pid_filename) log_path = ap_config["log_path"] data_path = ap_config["data_path"] @@ -386,7 +388,7 @@ def _execute(self, data, parent_data, common_data: CommonData): # 如 linux-arm、linux-x86、windows-x86 的插件,需分为三组 # 把多个IP合并为一个任务,可以利用GSE文件管道的BT能力,提高传输效率 jobs: Dict[str, Dict[str, Union[list, str]]] = defaultdict(lambda: defaultdict(list)) - nginx_path = settings.NGINX_DOWNLOAD_PATH + nginx_path = settings.DOWNLOAD_PATH for process_status in process_statuses: bk_host_id = process_status.bk_host_id subscription_instance = group_id_instance_map.get(process_status.group_id) diff --git a/apps/backend/exceptions.py b/apps/backend/exceptions.py index fe6781987..717d6a1f8 100644 --- a/apps/backend/exceptions.py +++ b/apps/backend/exceptions.py @@ -56,3 +56,13 @@ class GenCommandsError(BackendBaseException): class GseEncryptedError(BackendBaseException): MESSAGE = _("GSE敏感信息加密失败") ERROR_CODE = 8 + + +class PluginParseError(BackendBaseException): + MESSAGE = _("插件解析错误") + ERROR_CODE = 9 + + +class CreatePackageRecordError(BackendBaseException): + MESSAGE = _("归档插件包信息错误") + ERROR_CODE = 10 diff --git a/apps/backend/management/commands/copy_file_to_nginx.py b/apps/backend/management/commands/copy_file_to_nginx.py index f65de4db0..c9aff9e6b 100644 --- a/apps/backend/management/commands/copy_file_to_nginx.py +++ b/apps/backend/management/commands/copy_file_to_nginx.py @@ -29,7 +29,7 @@ def handle(self, *args, **options): # 接入点配置的nginx路径 nginx_paths = [ap.nginx_path for ap in AccessPoint.objects.all() if ap.nginx_path] # 默认nginx路径 - nginx_paths.append(settings.NGINX_DOWNLOAD_PATH) + nginx_paths.append(settings.DOWNLOAD_PATH) # 去重 nginx_paths = list(set(nginx_paths)) for _path in os.listdir(settings.BK_SCRIPTS_PATH): diff --git a/apps/backend/management/commands/init_official_plugins.py b/apps/backend/management/commands/init_official_plugins.py index cff5c0df2..d8ce99d98 100644 --- a/apps/backend/management/commands/init_official_plugins.py +++ b/apps/backend/management/commands/init_official_plugins.py @@ -19,8 +19,9 @@ from django.core.management.base import BaseCommand from django.db.transaction import atomic +from apps.backend.plugin import tools from apps.node_man import models -from apps.utils.basic import md5 +from apps.utils.files import md5sum from common.log import logger @@ -59,7 +60,7 @@ def handle(self, *args, **options): # 后续可以考虑通过路径来判断 module="gse_plugin", file_path=file_abs_path, - md5=md5(file_abs_path), + md5=md5sum(name=file_abs_path), operator="system", source_app_code="bk_nodeman", file_name=file_name, @@ -68,10 +69,11 @@ def handle(self, *args, **options): try: # 如果是官方内置的插件,那么应该是直接发布的 - package_list = upload_record.create_package_records( + package_list = tools.create_package_records( + file_path=upload_record.file_path, + file_name=upload_record.file_name, is_release=True, is_template_load=True, - is_template_overwrite=True, ) except Exception as error: # 但是需要注意这个文件可能是已经存在的文件,会有导入失败的问题 diff --git a/apps/backend/plugin/serializers.py b/apps/backend/plugin/serializers.py index f40993cb8..840f6c66c 100644 --- a/apps/backend/plugin/serializers.py +++ b/apps/backend/plugin/serializers.py @@ -13,11 +13,12 @@ import base64 import hashlib +from django.utils.translation import ugettext_lazy as _ from rest_framework import serializers from rest_framework.exceptions import ValidationError from apps.node_man import constants -from apps.node_man.models import GsePluginDesc, Packages +from apps.node_man.models import DownloadRecord, GsePluginDesc, Packages class GatewaySerializer(serializers.Serializer): @@ -202,14 +203,28 @@ def validate(self, attrs): return attrs -class UploadInfoSerializer(GatewaySerializer): +class UploadInfoBaseSerializer(GatewaySerializer): + md5 = serializers.CharField(help_text=_("上传端计算的文件md5"), max_length=32) + file_name = serializers.CharField(help_text=_("上传端提供的文件名"), min_length=1) + module = serializers.CharField(max_length=32, required=False, default="gse_plugin") + + +class UploadInfoSerializer(UploadInfoBaseSerializer): """上传插件包接口序列化器""" - module = serializers.CharField(max_length=32, required=False, default="gse_plugin") - md5 = serializers.CharField(max_length=32) - file_name = serializers.CharField() - file_local_path = serializers.CharField(max_length=512) - file_local_md5 = serializers.CharField(max_length=32) + file_local_path = serializers.CharField(help_text=_("本地文件路径"), max_length=512) + file_local_md5 = serializers.CharField(help_text=_("Nginx所计算的文件md5"), max_length=32) + + +class CosUploadInfoSerializer(UploadInfoBaseSerializer): + download_url = serializers.URLField(help_text=_("对象存储文件下载url"), required=False) + file_path = serializers.CharField(help_text=_("文件保存路径"), min_length=1, required=False) + + def validate(self, attrs): + # 两种参数模式少要有一种满足 + if not ("download_url" in attrs or "file_path" in attrs): + raise ValidationError("at least has download_url or file_path") + return attrs class PluginStartDebugSerializer(GatewaySerializer): @@ -238,17 +253,25 @@ def validate(self, attrs): class PluginRegisterSerializer(GatewaySerializer): - file_name = serializers.CharField() - is_release = serializers.BooleanField() + file_name = serializers.CharField(help_text=_("文件名称")) + is_release = serializers.BooleanField(help_text=_("是否立即发布该插件")) # 两个配置文件相关参数选填,兼容监控 - # 是否需要读取配置文件 - is_template_load = serializers.BooleanField(required=False, default=False) - # 是否可以覆盖已经存在的配置文件 - is_template_overwrite = serializers.BooleanField(required=False, default=False) + is_template_load = serializers.BooleanField(help_text=_("是否需要读取配置文件"), required=False, default=False) + is_template_overwrite = serializers.BooleanField(help_text=_("是否可以覆盖已经存在的配置文件"), required=False, default=False) + # TODO 废弃字段,改用 select_pkg_relative_paths,待与前端联调后移除该字段 select_pkg_abs_paths = serializers.ListField(required=False, min_length=1, child=serializers.CharField()) + select_pkg_relative_paths = serializers.ListField( + required=False, min_length=1, child=serializers.CharField(), help_text=_("选择注册的插件包相对路径,缺省默认全选") + ) + + def validate(self, attrs): + attrs["select_pkg_relative_paths"] = attrs.get("select_pkg_abs_paths") + + return attrs + class PluginRegisterTaskSerializer(GatewaySerializer): job_id = serializers.IntegerField() @@ -287,6 +310,15 @@ def validate(self, data): creator = serializers.CharField() bk_app_code = serializers.CharField() + def validate(self, attrs): + if attrs["category"] not in DownloadRecord.CATEGORY_TASK_DICT: + raise ValidationError( + "请求下载类型 -> {category} 暂不支持,可选项 -> {choices}".format( + category=attrs["category"], choices=DownloadRecord.CATEGORY_CHOICES + ) + ) + return attrs + class DeletePluginSerializer(GatewaySerializer): name = serializers.CharField() diff --git a/apps/backend/plugin/tasks.py b/apps/backend/plugin/tasks.py index 60b6363c5..2adc0308a 100644 --- a/apps/backend/plugin/tasks.py +++ b/apps/backend/plugin/tasks.py @@ -18,6 +18,7 @@ from django.utils.translation import ugettext as _ from apps.backend.celery import app +from apps.backend.plugin import tools from apps.backend.utils.pipeline_parser import PipelineParser as CustomPipelineParser from apps.node_man import constants as const from apps.node_man import models @@ -41,23 +42,24 @@ def package_task(job_id, task_params): job = models.Job.objects.get(id=job_id, job_type=const.JobType.PACKING_PLUGIN) except models.Job.DoesNotExist: - logger.error("try to execute job->[%s] but is not exists") + logger.error("try to execute job-> {job_id} but is not exists".format(job_id=job_id)) return False try: file_name = task_params["file_name"] is_release = task_params["is_release"] - select_pkg_abs_paths = task_params.get("select_pkg_abs_paths") + select_pkg_relative_paths = task_params.get("select_pkg_relative_paths") # 使用最后的一条上传记录 upload_package_object = models.UploadPackage.objects.filter(file_name=file_name).order_by("-upload_time")[0] # 2. 执行任务 - upload_package_object.create_package_records( + tools.create_package_records( + file_path=upload_package_object.file_path, + file_name=upload_package_object.file_name, is_release=is_release, creator=task_params["bk_username"], - select_pkg_abs_paths=select_pkg_abs_paths, + select_pkg_relative_paths=select_pkg_relative_paths, is_template_load=task_params.get("is_template_load", False), - is_template_overwrite=task_params.get("is_template_overwrite", False), ) except PermissionError: @@ -85,7 +87,7 @@ def package_task(job_id, task_params): job.save() if job.status == const.JobStatusType.SUCCESS: - logger.info("task->[%s] has finish all job." % job.id) + logger.info("task -> {job_id} has finish all job.".format(job_id=job.id)) @app.task(queue="backend") diff --git a/apps/backend/plugin/tools.py b/apps/backend/plugin/tools.py index c33ab9615..b8f2bf03b 100644 --- a/apps/backend/plugin/tools.py +++ b/apps/backend/plugin/tools.py @@ -10,124 +10,241 @@ """ import logging import os +import shutil +import tarfile import traceback from collections import defaultdict -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import yaml +from django.conf import settings +from django.db import transaction +from django.utils import timezone from django.utils.translation import ugettext_lazy as _ from packaging import version -from apps.backend.exceptions import PackageVersionValidationError +from apps.backend import exceptions +from apps.core.files.storage import get_storage from apps.node_man import constants, models +from apps.utils import env, files logger = logging.getLogger("app") -def parse_package(package_info: dict, is_update: bool, project: str) -> dict: - package_parse_detail = { +def list_package_infos(file_path: str) -> List[Dict[str, Any]]: + """ + :param file_path: 插件包所在路径 + 解析`self.file_path`下插件,获取包信息字典列表 + :return: [ + { + # 存放插件的临时目录 + "plugin_tmp_dir": "/tmp/12134/", + # 插件包的相对路径 + "pkg_relative_path": "plugins_linux_x86_64/package_name" + # 插件包的绝对路径 + "pkg_absolute_path": "/tmp/12134/plugins_linux_x86_64/package_name", + # 插件所需操作系统 + "package_os": "linux", + # 支持cpu位数 + "cpu_arch": "x86_64", + # 是否为自定义插件 + "is_external": "False" + }, + ... + ] + """ + storage = get_storage() + if not storage.exists(name=file_path): + raise exceptions.PluginParseError(_("插件不存在: file_path -> {file_path}").format(file_path=file_path)) + + # 解压压缩文件 + tmp_dir = files.mk_and_return_tmpdir() + with storage.open(name=file_path, mode="rb") as tf_from_storage: + with tarfile.open(fileobj=tf_from_storage) as tf: + # 检查是否存在可疑内容 + for file_info in tf.getmembers(): + if file_info.name.startswith("/") or "../" in file_info.name: + logger.error( + "file-> {file_path} contains member-> {name} try to escape!".format( + file_path=file_path, name=file_info.name + ) + ) + raise exceptions.PluginParseError(_("文件包含非法路径成员 -> {name},请检查").format(name=file_info.name)) + logger.info( + "file-> {file_path} extract to path -> {tmp_dir} success.".format(file_path=file_path, tmp_dir=tmp_dir) + ) + tf.extractall(path=tmp_dir) + + package_infos = [] + + # 遍历第一层的内容,获取操作系统和cpu架构信息,eg:external(可无,有表示自定义插件)_plugins_linux_x86_64 + for first_plugin_dir_name in os.listdir(tmp_dir): + # 通过正则提取出插件(plugin)目录名中的插件信息 + re_match = constants.PACKAGE_PATH_RE.match(first_plugin_dir_name) + if re_match is None: + logger.info( + "pkg_dir_name -> {pkg_dir_name} is not match re, jump it.".format(pkg_dir_name=first_plugin_dir_name) + ) + continue + + # 将文件名解析为插件信息字典 + plugin_info_dict = re_match.groupdict() + current_os = plugin_info_dict["os"] + cpu_arch = plugin_info_dict["cpu_arch"] + logger.info( + "pkg_dir_name -> {pkg_dir_name} is match for os -> {os}, cpu -> {cpu_arch}".format( + pkg_dir_name=first_plugin_dir_name, os=current_os, cpu_arch=cpu_arch + ) + ) + + first_level_plugin_path = os.path.join(tmp_dir, first_plugin_dir_name) + # 遍历第二层的内容,获取包名, eg:plugins_linux_x86_64/package_name + for second_package_dir_name in os.listdir(first_level_plugin_path): + # 拼接获取包路径 + second_level_package_dir_path = os.path.join(first_level_plugin_path, second_package_dir_name) + if not os.path.isdir(second_level_package_dir_path): + logger.info("found file path -> {path} jump it".format(path=second_level_package_dir_path)) + continue + + package_infos.append( + { + "plugin_tmp_dir": tmp_dir, + "pkg_relative_path": os.path.join(first_plugin_dir_name, second_package_dir_name), + "pkg_absolute_path": second_level_package_dir_path, + "package_os": current_os, + "cpu_arch": cpu_arch, + "is_external": plugin_info_dict["is_external"] is not None, + } + ) + + return package_infos + + +def parse_package( + pkg_absolute_path: str, package_os: str, cpu_arch: str, is_update: bool, need_detail: bool = False +) -> Dict[str, Any]: + """ + 解析插件包 + :param pkg_absolute_path: 插件包所在的绝对路径 + :param package_os: 操作系统类型,lower + :param cpu_arch: cpu架构 + :param is_update: 是否校验更新 + :param need_detail: 是否需要解析详情,用于create_package_record创建插件包记录 + :return: + """ + pkg_parse_info = { "result": True, "message": "", "pkg_name": None, - "pkg_abs_path": package_info["pkg_abs_path"], "project": None, "version": None, "category": None, - "config_templates": [], - "os": package_info["package_os"], - "cpu_arch": package_info["cpu_arch"], "description": None, + "config_templates": [], + "os": package_os, + "cpu_arch": cpu_arch, } - update_flag = False - # 1. 判断是否存在project.yaml文件 - project_file_path = os.path.join(package_info["dir_path"], "project.yaml") - if not os.path.exists(project_file_path): + # 判断是否存在project.yaml文件 + project_yaml_file_path = os.path.join(pkg_absolute_path, "project.yaml") + if not os.path.exists(project_yaml_file_path): logger.warning( - "try to pack path->[%s] but is not [project.yaml] file under file path" % package_info["dir_path"] + "try to pack path-> {pkg_absolute_path} but not [project.yaml] file under file path".format( + pkg_absolute_path=pkg_absolute_path + ) ) - package_parse_detail["result"] = False - package_parse_detail["message"] = _("缺少project.yaml文件") - return package_parse_detail + pkg_parse_info["result"] = False + pkg_parse_info["message"] = _("缺少project.yaml文件") + return pkg_parse_info - # 2. 解析project.yaml文件(版本,插件名等信息) + # 解析project.yaml文件(版本,插件名等信息) try: - with open(project_file_path, "r", encoding="utf-8") as project_file: + with open(project_yaml_file_path, "r", encoding="utf-8") as project_file: yaml_config = yaml.safe_load(project_file) if not isinstance(yaml_config, dict): raise yaml.YAMLError except (IOError, yaml.YAMLError): logger.warning( - "failed to parse or read project_yaml->[{}] for->[{}]".format(project_file_path, traceback.format_exc()) + "failed to parse or read project_yaml -> {project_yaml_file_path}, for -> {err_msg}".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) ) - package_parse_detail["result"] = False - package_parse_detail["message"] = _("project.yaml文件解析读取失败") - return package_parse_detail + pkg_parse_info["result"] = False + pkg_parse_info["message"] = _("project.yaml文件解析读取失败") + return pkg_parse_info try: # 解析版本号转为字符串,防止x.x情况被解析为浮点型,同时便于后续写入及比较 yaml_config["version"] = str(yaml_config["version"]) - package_parse_detail.update( + pkg_parse_info.update( { - "pkg_name": "{}-{}.tgz".format(yaml_config["name"], yaml_config["version"]), + "pkg_name": "{project}-{version}.tgz".format( + project=yaml_config["name"], version=yaml_config["version"] + ), "project": yaml_config["name"], "version": yaml_config["version"], "category": yaml_config["category"], "description": yaml_config.get("description", ""), } ) - except KeyError: + + # 无法解析 project.control 成为 python 字典类型 + if not isinstance(yaml_config.get("control", {}), dict): + raise TypeError("can't not convert control in project.yaml to python dict.") + + except (KeyError, TypeError): logger.warning( - "failed to get key info from project.yaml->[%s] for->[%s] maybe config file error?" - % (project_file_path, traceback.format_exc()) + "failed to get key info from project.yaml -> {project_yaml_file_path}, for -> {err_msg}".format( + project_yaml_file_path=project_yaml_file_path, err_msg=traceback.format_exc() + ) ) - package_parse_detail["result"] = False - package_parse_detail["message"] = _("project.yaml文件信息缺失") - return package_parse_detail + pkg_parse_info["result"] = False + pkg_parse_info["message"] = _("project.yaml 文件信息缺失") + return pkg_parse_info + # 插件包版本更新标志,用于描述插件包是否为下述的「更新插件版本」情况 + update_flag = False + # 插件包最新版本预先初始化为当前解析插件包的版本 package_release_version = yaml_config["version"] - # 更新插件名称对应不上 - if is_update and yaml_config["name"] != project: - raise PackageVersionValidationError( - _("期望更新的插件为[{project}],实际上传的插件为[{update_plugin_name}]").format( - project=project, update_plugin_name=yaml_config["name"] + # 判断插件类型是否符合预取 + if pkg_parse_info["category"] not in constants.CATEGORY_TUPLE: + logger.warning( + "project -> {project}, version -> {version}: update(or create) with category-> {category} " + "which is not acceptable, nothing will do.".format( + project=pkg_parse_info["project"], + version=pkg_parse_info["version"], + category=pkg_parse_info["category"], ) ) + pkg_parse_info["result"] = False + pkg_parse_info["message"] = _("project.yaml 中 category 配置异常,请确认后重试") + return pkg_parse_info - # 3. 判断插件类型是否符合预取 - if yaml_config["category"] not in constants.CATEGORY_TUPLE: - logger.warning( - "project->[%s] version->[%s] update(or create) with category->[%s] which is not acceptable, " - "nothing will do." % (yaml_config["name"], yaml_config["version"], yaml_config["category"]) - ) - package_parse_detail["result"] = False - package_parse_detail["message"] = _("project.yaml中category配置异常,请确认后重试") - return package_parse_detail - package_parse_detail["category"] = constants.CATEGORY_DICT[yaml_config["category"]] + packages_queryset = models.Packages.objects.filter(project=yaml_config["name"], os=package_os, cpu_arch=cpu_arch) - packages_queryset = models.Packages.objects.filter( - project=yaml_config["name"], os=package_info["package_os"], cpu_arch=package_info["cpu_arch"] - ) - # 4. 判断是否为新增插件 + # 判断是否为新增插件 if not packages_queryset.exists(): logger.info( - "project->[%s] os->[%s] cpu_arch->[%s] is not exists, this operations will create new package" - % (yaml_config["name"], package_info["package_os"], package_info["cpu_arch"]) + "project-> {project}, os-> {os}, cpu_arch-> {cpu_arch} is not exists, " + "this operations will create new package".format( + project=pkg_parse_info["project"], os=package_os, cpu_arch=cpu_arch + ) ) - package_parse_detail["message"] = _("新增插件") + pkg_parse_info["message"] = _("新增插件") - # 5. 判断以前是否已发布过该插件版本 + # 判断是否已发布过该插件版本 elif packages_queryset.filter(version=yaml_config["version"], is_release_version=True, is_ready=True).exists(): logger.warning( - "project->[%s] version->[%s] os->[%s] cpu_arch->[%s] is release, no more operations is " - "allowed." - % (yaml_config["name"], yaml_config["version"], package_info["package_os"], package_info["cpu_arch"]) + "project -> {project}, version-> {version}, os-> {os}, cpu_arch -> {cpu_arch} is release, " + "will overwrite it".format( + project=pkg_parse_info["project"], version=pkg_parse_info["version"], os=package_os, cpu_arch=cpu_arch + ) ) - package_parse_detail["message"] = _("已有版本插件更新") + pkg_parse_info["message"] = _("已有版本插件更新") - # 6. 判断预导入插件版本同最新版本的关系 + # 判断预导入插件版本同最新版本的关系 else: # 取出最新版本号 package_release_version = sorted( @@ -135,50 +252,60 @@ def parse_package(package_info: dict, is_update: bool, project: str) -> dict: )[-1] if version.parse(package_release_version) > version.parse(yaml_config["version"]): - package_parse_detail["message"] = _("低版本插件仅支持导入") + pkg_parse_info["message"] = _("低版本插件仅支持导入") else: update_flag = True - package_parse_detail["message"] = _("更新插件版本") + pkg_parse_info["message"] = _("更新插件版本") logger.info( - "project->[{project}] validate version: is_update->[{is_update}], update_flag->[{update_flag}]".format( + "project -> {project} validate version: is_update -> {is_update}, update_flag -> {update_flag}".format( project=yaml_config["name"], is_update=is_update, update_flag=update_flag ) ) # 需要校验版本更新,但该插件没有升级 if is_update and not update_flag: - raise PackageVersionValidationError( - _("文件路径[{pkg_abs_path}]所在包解析版本为[{parse_version}], 最新版本为[{release_version}], 更新校验失败").format( - pkg_abs_path=package_parse_detail["pkg_abs_path"], - parse_version=package_parse_detail["version"], + raise exceptions.PackageVersionValidationError( + _("文件路径 -> {pkg_absolute_path} 所在包解析版本为 -> {version}, 最新版本 -> {release_version}, 更新校验失败").format( + pkg_absolute_path=pkg_absolute_path, + version=pkg_parse_info["version"], release_version=package_release_version, ) ) - # 7. 解析配置模板 + # 解析配置模板 config_templates = yaml_config.get("config_templates", []) for config_template in config_templates: + # 配置模板所在的相对路径 source_path = config_template["source_path"] - template_file_path = os.path.join(package_info["dir_path"], source_path) + # 配置模板所在的绝对路径 + template_file_path = os.path.join(pkg_absolute_path, source_path) if not os.path.exists(template_file_path): logger.warning( - "project.yaml need to import file->[%s] but is not exists, nothing will do." - % config_template["source_path"] + "project.yaml need to import file -> {source_path} but is not exists, nothing will do.".format( + source_path=config_template["source_path"] + ) ) - package_parse_detail["result"] = False - package_parse_detail["message"] = _("找不到需要导入的配置模板文件[%s]") % source_path - return package_parse_detail + pkg_parse_info["result"] = False + pkg_parse_info["message"] = _("找不到需要导入的配置模板文件 -> {source_path}").format(source_path=source_path) + return pkg_parse_info - package_parse_detail["config_templates"].append( + pkg_parse_info["config_templates"].append( { "name": config_template["name"], + "is_main": config_template.get("is_main_config", False), + "source_path": source_path, + "file_path": config_template["file_path"], + "format": config_template["format"], # 解析版本号转为字符串,防止x.x情况被解析为浮点型,同时便于后续写入及比较 "version": str(config_template["version"]), - "is_main": config_template.get("is_main_config", False), + "plugin_version": str(config_template["plugin_version"]), } ) - return package_parse_detail + + if need_detail: + pkg_parse_info["yaml_config"] = yaml_config + return pkg_parse_info def fetch_latest_config_templates(config_templates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: @@ -194,3 +321,286 @@ def fetch_latest_config_templates(config_templates: List[Dict[str, Any]]) -> Lis latest_config_templates.append(config_tmpls_order_by_version[-1]) return latest_config_templates + + +def create_package_records( + file_path: str, + file_name: str, + is_release: bool, + creator: Optional[str] = None, + select_pkg_relative_paths: Optional[List[str]] = None, + is_template_load: bool = False, +) -> List[models.Packages]: + """ + 解析上传插件,拆分为插件包并保存记录 + :param file_path: 上传插件所在路径 + :param file_name: 上传插件名称 + :param is_release: 是否正式发布 + :param creator: 操作人 + :param select_pkg_relative_paths: 指定注册插件包的相对路径列表 + :param is_template_load: 是否需要读取配置文件 + :return: [package_object, ...] + :return: + """ + pkg_record_objs = [] + package_infos = list_package_infos(file_path=file_path) + + with transaction.atomic(): + for package_info in package_infos: + if not ( + select_pkg_relative_paths is None or package_info["pkg_relative_path"] in select_pkg_relative_paths + ): + logger.info("path -> {path} not selected, jump it".format(path=package_info["pkg_relative_path"])) + continue + pkg_record_obj = create_pkg_record( + pkg_absolute_path=package_info["pkg_absolute_path"], + package_os=package_info["package_os"], + cpu_arch=package_info["cpu_arch"], + is_release=is_release, + creator=creator, + is_external=package_info["is_external"], + is_template_load=is_template_load, + ) + + logger.info( + "package path -> {path} add to pkg record-> {record_id} success.".format( + path=package_info["pkg_relative_path"], record_id=pkg_record_obj.id + ) + ) + pkg_record_objs.append(pkg_record_obj) + + logger.info("plugin -> {file_name} create pkg record all done.".format(file_name=file_name)) + + # 清理临时解压目录 + plugin_tmp_dirs = set([package_info["plugin_tmp_dir"] for package_info in package_infos]) + for plugin_tmp_dir in plugin_tmp_dirs: + shutil.rmtree(plugin_tmp_dir) + + return pkg_record_objs + + +@transaction.atomic +def create_pkg_record( + pkg_absolute_path: str, + package_os: str, + cpu_arch: str, + is_external: bool, + creator: Optional[str] = None, + is_release: bool = True, + is_template_load: bool = False, +) -> models.Packages: + """ + 给定一个插件的路径,分析路径下的project.yaml,生成压缩包到nginx(多台)目录下 + !!!注意:该任务可能会导致长期的卡顿,请务必注意不要再wsgi等单线程环境中调用!!! + :param pkg_absolute_path: 需要进行打包的插件包临时解压路径, 例如,plugin_a 路径,路径下放置了插件包各个文件 + ⚠️ 该路径应为本地临时路径,插件包已从存储源下载到该路径 + :param package_os: 插件包支持的操作系统类型 + :param cpu_arch: 插件支持的CPU架构 + :param is_external: 是否第三方插件 + :param creator: 操作人 + :param is_release: 是否发布的版本 + :param is_template_load: 是否需要读取插件包中的配置模板 + :return: True | raise Exception + """ + pkg_parse_info = parse_package( + pkg_absolute_path=pkg_absolute_path, package_os=package_os, cpu_arch=cpu_arch, is_update=False, need_detail=True + ) + logger.info(f"pkg_absolute_path -> {pkg_absolute_path}, pkg_parse_info -> {pkg_parse_info}") + if not pkg_parse_info["result"]: + raise exceptions.PluginParseError(pkg_parse_info.get("message")) + + project = pkg_parse_info["project"] + yaml_config = pkg_parse_info["yaml_config"] + + # 判断是否已经由插件描述信息,需要写入 + desc, created = models.GsePluginDesc.objects.update_or_create( + name=project, + defaults=dict( + description=yaml_config.get("description", ""), + scenario=yaml_config.get("scenario", ""), + description_en=yaml_config.get("description_en", ""), + scenario_en=yaml_config.get("scenario_en", ""), + category=yaml_config["category"], + launch_node=yaml_config.get("launch_node", "all"), + config_file=yaml_config.get("config_file", ""), + config_format=yaml_config.get("config_format", ""), + use_db=bool(yaml_config.get("use_db", False)), + auto_launch=bool(yaml_config.get("auto_launch", False)), + is_binary=bool(yaml_config.get("is_binary", True)), + node_manage_control=yaml_config.get("node_manage_control", ""), + ), + ) + if created: + logger.info( + "plugin_desc_id -> {plugin_desc_id} for project -> {project} is created".format( + plugin_desc_id=desc.id, project=project + ) + ) + + # 写入插件包信息 + packages_queryset = models.Packages.objects.filter( + project=project, version=pkg_parse_info["version"], os=package_os, cpu_arch=cpu_arch + ) + if not packages_queryset.exists(): + # 如果之前未有未发布的插件包信息,需要新建 + pkg_record = models.Packages.objects.create( + pkg_name=pkg_parse_info["pkg_name"], + version=pkg_parse_info["version"], + module="gse_plugin", + creator=creator or settings.SYSTEM_USE_API_ACCOUNT, + project=project, + pkg_size=0, + pkg_path="", + md5="", + pkg_mtime="", + pkg_ctime="", + location="", + os=package_os, + cpu_arch=cpu_arch, + is_release_version=is_release, + is_ready=False, + ) + else: + # 否则,更新已有的记录即可 + pkg_record = packages_queryset.first() + + # 判断是否需要更新配置文件模板 + if is_template_load: + for config_template_info in pkg_parse_info["config_templates"]: + + template_file_path = os.path.join(pkg_absolute_path, config_template_info["source_path"]) + + with open(template_file_path) as template_fs: + config_template_obj, __ = models.PluginConfigTemplate.objects.update_or_create( + plugin_name=pkg_record.project, + plugin_version=config_template_info["plugin_version"], + name=config_template_info["name"], + version=config_template_info["version"], + is_main=config_template_info["is_main"], + defaults=dict( + format=config_template_info["format"], + file_path=config_template_info["file_path"], + content=template_fs.read(), + is_release_version=is_release, + creator="system", + create_time=timezone.now(), + source_app_code="bk_nodeman", + ), + ) + + logger.info( + "template -> {name} template_version -> {template_version} is create for plugin -> {project} " + "version -> {version}".format( + name=config_template_obj.name, + template_version=config_template_obj.version, + project=pkg_record.project, + version=pkg_record.version, + ) + ) + + proc_control, __ = models.ProcControl.objects.get_or_create( + plugin_package_id=pkg_record.id, defaults=dict(module="gse_plugin", project=pkg_parse_info["project"]) + ) + + # 更新插件包相关路径 + path_info = env.get_gse_env_path( + pkg_parse_info["project"], is_windows=(package_os == constants.OsType.WINDOWS.lower()) + ) + proc_control.install_path = path_info["install_path"] + proc_control.log_path = path_info["log_path"] + proc_control.data_path = path_info["data_path"] + proc_control.pid_path = path_info["pid_path"] + + # 更新插件包操作命令 + control_info = yaml_config.get("control", {}) + proc_control.start_cmd = control_info.get("start", "") + proc_control.stop_cmd = control_info.get("stop", "") + proc_control.restart_cmd = control_info.get("restart", "") + proc_control.reload_cmd = control_info.get("reload", "") + proc_control.kill_cmd = control_info.get("kill", "") + proc_control.version_cmd = control_info.get("version", "") + proc_control.health_cmd = control_info.get("health_check", "") + proc_control.debug_cmd = control_info.get("debug", "") + + proc_control.os = package_os + + # 更新插件二进制配置信息,如果不存在默认为空 + proc_control.process_name = yaml_config.get("process_name") + + # 更新是否需要托管 + proc_control.need_delegate = yaml_config.get("need_delegate", True) + + # 更新端口范围信息 + port_range = yaml_config.get("port_range", "") + + # 校验端口范围合法性 + port_range_list = models.ProcControl.parse_port_range(port_range) + if port_range_list: + proc_control.port_range = port_range + + proc_control.save() + + logger.info( + "process control -> {id} for plugin -> {project} version -> {version} os -> {os} is created.".format( + id=proc_control.id, project=project, version=pkg_record.version, os=package_os + ) + ) + + # 打包插件包,先在本地打包为tar + package_tmp_path = os.path.join(constants.TMP_DIR, f"{project}-{pkg_record.version}-{package_os}-{cpu_arch}.tgz") + with tarfile.open(package_tmp_path, "w:gz") as tf: + tf.add( + pkg_absolute_path, + # 判断是否第三方插件的路径 + arcname=f"{constants.PluginChildDir.EXTERNAL.value}/{project}" + if is_external + else f"{constants.PluginChildDir.OFFICIAL.value}/{project}", + ) + logger.info( + "project -> {project} version -> {version} now is pack to package_tmp_path -> {package_tmp_path}".format( + project=project, version=pkg_record.version, package_tmp_path=package_tmp_path + ) + ) + + # 将插件包上传到存储系统 + package_target_path = os.path.join(settings.DOWNLOAD_PATH, pkg_record.os, pkg_record.cpu_arch, pkg_record.pkg_name) + with open(package_tmp_path, mode="rb") as tf: + # 采用同名覆盖策略,保证同版本插件包仅保存一份 + storage_path = get_storage(file_overwrite=True).save(package_target_path, tf) + if storage_path != package_target_path: + logger.error( + "package save error, except save to -> {package_target_path}, but -> {storage_path}".format( + package_target_path=package_target_path, storage_path=storage_path + ) + ) + raise exceptions.CreatePackageRecordError( + _("插件包保存错误,期望保存到 -> {package_target_path}, 实际保存到 -> {storage_path}").format( + package_target_path=package_target_path, storage_path=storage_path + ) + ) + + # 补充插件包的文件存储信息 + pkg_record.is_ready = True + pkg_record.pkg_mtime = str(timezone.now()) + # pkg_ctime 仅记录该插件包信息的创建时间 + pkg_record.pkg_ctime = pkg_record.pkg_ctime or pkg_record.pkg_mtime + pkg_record.pkg_size = os.path.getsize(package_tmp_path) + pkg_record.pkg_path = os.path.dirname(package_target_path) + pkg_record.md5 = files.md5sum(name=package_tmp_path) + # 这里没有加上包名,是因为原本脚本(bkee/bkce)中就没有加上,为了防止已有逻辑异常,保持一致 + # 后面有哪位发现这里不适用了,可以一并修改 + pkg_record.location = f"http://{os.getenv('LAN_IP')}/download/{package_os}/{cpu_arch}" + + pkg_record.save() + + logger.info( + "package -> {pkg_name}, package_target_path -> {package_target_path} now is ready to use".format( + pkg_name=pkg_record.pkg_name, package_target_path=package_target_path + ) + ) + + # 清理临时文件 + os.remove(package_tmp_path) + logger.info("clean temp tgz file -> {temp_file_path} done.".format(temp_file_path=package_tmp_path)) + + return pkg_record diff --git a/apps/backend/plugin/views.py b/apps/backend/plugin/views.py index 9b3ba275d..b9c668b8c 100644 --- a/apps/backend/plugin/views.py +++ b/apps/backend/plugin/views.py @@ -17,6 +17,7 @@ import logging import os import re +import shutil from collections import defaultdict from copy import deepcopy from itertools import groupby @@ -46,6 +47,7 @@ ) from apps.backend.subscription.handler import SubscriptionHandler from apps.backend.subscription.tasks import run_subscription_task_and_create_instance +from apps.core.files.storage import get_storage from apps.exceptions import AppBaseException, ValidationError from apps.generic import APIViewSet from apps.node_man import constants as const @@ -53,6 +55,7 @@ from pipeline.engine.exceptions import InvalidOperationException from pipeline.service import task_service from pipeline.service.pipeline_engine_adapter.adapter_api import STATE_MAP +from apps.utils import files LOG_PREFIX_RE = re.compile(r"(\[\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}.*?\] )") logger = logging.getLogger("app") @@ -162,7 +165,9 @@ def create_plugin_register_task(self, request): ) # 这个新的任务,应该是指派到自己机器上的打包任务 tasks.package_task.delay(job.id, params) - logger.info("create job->[{}] to unpack file->[{}] plugin".format(job.id, file_name)) + logger.info( + "create job-> {job_id} to unpack file-> {file_name} plugin".format(job_id=job.id, file_name=file_name) + ) return Response({"job_id": job.id}) @@ -735,16 +740,18 @@ def create_export_task(self, request): record = models.DownloadRecord.create_record( category=params["category"], query_params=params["query_params"], - creator=params["creator"], + creator=params["bk_username"], source_app_code=params["bk_app_code"], ) logger.info( - "user->[%s] request to export from system->[%s] success created record->[%s]." - % (params["creator"], params["bk_app_code"], record.id) + "user -> {username} request to export from system -> {bk_app_code} success created " + "record -> {record_id}.".format( + username=params["bk_username"], bk_app_code=params["bk_app_code"], record_id=record.id + ) ) tasks.export_plugin.delay(record.id) - logger.info("record->[%s] now is active to celery" % record.id) + logger.info("record-> {record_id} now is active to celery".format(record_id=record.id)) return Response({"job_id": record.id}) @@ -773,19 +780,30 @@ def query_export_task(self, request): try: record = models.DownloadRecord.objects.get(id=job_id) except models.DownloadRecord.DoesNotExist: - logger.error("record->[%s] not exists, something go wrong?" % job_id) + logger.error("record-> {record_id} not exists, something go wrong?".format(record_id=job_id)) raise ValueError(_("请求任务不存在,请确认后重试")) - uri_with_params = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_URL, record.download_params]) + if record.is_failed or not record.file_path: + download_url = "" + else: + # TODO: 此处后续需要提供一个统一的 storage.tmp_url(name) 方法,用于插件包的临时下载 + if settings.STORAGE_TYPE in const.COS_TYPES: + download_url = get_storage().url(record.file_path) + else: + download_url = "?".join([settings.BKAPP_NODEMAN_DOWNLOAD_API, record.download_params]) response_data = { "is_finish": record.is_finish, "is_failed": record.is_failed, - "download_url": uri_with_params if not record.is_failed else "", # 下载URL + "download_url": download_url, "error_message": record.error_message, } - logger.info("export record->[{}] response_data->[{}]".format(job_id, response_data)) + logger.info( + "export record -> {record_id} response_data -> {response_data}".format( + record_id=job_id, response_data=response_data + ) + ) return Response(response_data) @action(detail=False, methods=["POST"], url_path="parse") @@ -840,13 +858,32 @@ def parse(self, request): ) if upload_package_obj is None: raise exceptions.UploadPackageNotExistError(_("找不到请求发布的文件,请确认后重试")) - # 获取包信息并解析 - return Response( - [ - tools.parse_package(package_info, params["is_update"], params.get("project")) - for package_info in upload_package_obj.list_package_infos() - ] - ) + + # 获取插件中各个插件包的路径信息 + package_infos = tools.list_package_infos(file_path=upload_package_obj.file_path) + # 解析插件包 + pkg_parse_results = [] + for package_info in package_infos: + pkg_parse_result = tools.parse_package( + pkg_absolute_path=package_info["pkg_absolute_path"], + package_os=package_info["package_os"], + cpu_arch=package_info["cpu_arch"], + is_update=params["is_update"], + ) + pkg_parse_result.update( + { + "pkg_abs_path": package_info["pkg_relative_path"], + # parse_package 对 category 执行校验并返回错误信息,此处category不一定是合法值,所以使用get填充释义 + "category": const.CATEGORY_DICT.get(pkg_parse_result["category"]), + } + ) + pkg_parse_results.append(pkg_parse_result) + + # 清理临时解压目录 + plugin_tmp_dirs = set([package_info["plugin_tmp_dir"] for package_info in package_infos]) + for plugin_tmp_dir in plugin_tmp_dirs: + shutil.rmtree(plugin_tmp_dir) + return Response(pkg_parse_results) def list(self, request, *args, **kwargs): """ @@ -1265,6 +1302,77 @@ def upload_package(request): ) +@csrf_exempt +@login_exempt +def upload_package_by_cos(request): + ser = serializers.CosUploadInfoSerializer(data=request.POST) + if not ser.is_valid(): + logger.error("failed to valid request data for->[%s] maybe something go wrong?" % ser.errors) + raise ValidationError(_("请求参数异常 [{err}],请确认后重试").format(err=ser.errors)) + + md5 = ser.data["md5"] + origin_file_name = ser.data["file_name"] + file_path = ser.data.get("file_path") + download_url: str = ser.data.get("download_url") + + storage = get_storage() + + # TODO 此处的md5校验放到文件实际读取使用的地方更合理? + # file_path 不为空表示文件已在项目管理的对象存储上,此时仅需校验md5,减少文件IO + if file_path: + try: + if files.md5sum(file_obj=storage.open(name=file_path)) != md5: + raise ValidationError(_("上传文件MD5校验失败,请确认重试")) + except Exception as e: + raise ValidationError(_("文件不存在:file_path -> {file_path},error -> {err}").format(file_path=file_path, err=e)) + else: + # 创建临时存放下载插件的目录 + tmp_dir = files.mk_and_return_tmpdir() + with open(file=os.path.join(tmp_dir, origin_file_name), mode="wb+") as fs: + # 下载文件并写入fs + files.download_file(url=download_url, file_obj=fs, closed=False) + # 计算下载文件的md5 + local_md5 = files.md5sum(file_obj=fs, closed=False) + if local_md5 != md5: + logger.error( + "failed to valid file md5 local->[{}] user->[{}] maybe network error".format(local_md5, md5) + ) + raise ValidationError(_("上传文件MD5校验失败,请确认重试")) + + # 使用上传端提供的期望保存文件名,保存文件到项目所管控的存储 + file_path = storage.save(name=os.path.join(settings.UPLOAD_PATH, origin_file_name), content=fs) + + # 移除临时目录 + shutil.rmtree(tmp_dir) + + record = models.UploadPackage.create_record( + module=ser.data["module"], + file_path=file_path, + md5=md5, + operator=ser.data["bk_username"], + source_app_code=ser.data["bk_app_code"], + # 此处使用落地到文件系统的文件名,对象存储情况下文件已经写到仓库,使用接口传入的file_name会在后续判断中再转移一次文件 + file_name=os.path.basename(file_path), + ) + logger.info( + "user->[%s] from app->[%s] upload file->[%s] success." + % (record.creator, record.source_app_code, record.file_path) + ) + + return JsonResponse( + { + "result": True, + "message": "", + "code": "00", + "data": { + "id": record.id, # 包文件的ID + "name": record.file_name, # 包名 + "pkg_size": record.file_size, # 单位byte + }, + } + ) + + @csrf_exempt @login_exempt def export_download(request): diff --git a/apps/backend/subscription/steps/base.py b/apps/backend/subscription/steps/base.py index f87490835..e96fa8f96 100644 --- a/apps/backend/subscription/steps/base.py +++ b/apps/backend/subscription/steps/base.py @@ -157,7 +157,7 @@ def _generate_process_status_record(self, host): # 如果为 external 插件,需要补上插件组目录 setup_path = path_handler.join( package.proc_control.install_path, - "external_plugins", + const.PluginChildDir.EXTERNAL.value, group_id, package.project, ) @@ -166,7 +166,9 @@ def _generate_process_status_record(self, host): pid_path_prefix, pid_filename = path_handler.split(package.proc_control.pid_path) pid_path = path_handler.join(pid_path_prefix, group_id, pid_filename) else: - setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin") + setup_path = path_handler.join( + package.proc_control.install_path, const.PluginChildDir.OFFICIAL.value, "bin" + ) log_path = package.proc_control.log_path data_path = package.proc_control.data_path pid_path = package.proc_control.pid_path diff --git a/apps/backend/subscription/steps/plugin.py b/apps/backend/subscription/steps/plugin.py index cc2f83e6d..0df826444 100644 --- a/apps/backend/subscription/steps/plugin.py +++ b/apps/backend/subscription/steps/plugin.py @@ -767,14 +767,16 @@ def generate_process_status_record( if package.plugin_desc.category == constants.CategoryType.external: # 如果为 external 插件,需要补上插件组目录 setup_path = path_handler.join( - package.proc_control.install_path, "external_plugins", group_id, package.project + package.proc_control.install_path, constants.PluginChildDir.EXTERNAL.value, group_id, package.project ) log_path = path_handler.join(package.proc_control.log_path, group_id) data_path = path_handler.join(package.proc_control.data_path, group_id) pid_path_prefix, pid_filename = path_handler.split(package.proc_control.pid_path) pid_path = path_handler.join(pid_path_prefix, group_id, pid_filename) else: - setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin") + setup_path = path_handler.join( + package.proc_control.install_path, constants.PluginChildDir.OFFICIAL.value, "bin" + ) log_path = package.proc_control.log_path data_path = package.proc_control.data_path pid_path = package.proc_control.pid_path diff --git a/apps/backend/tests/plugin/test_plugin.py b/apps/backend/tests/plugin/test_plugin.py index e7fa4ae16..acee06b80 100644 --- a/apps/backend/tests/plugin/test_plugin.py +++ b/apps/backend/tests/plugin/test_plugin.py @@ -21,6 +21,7 @@ from django.core.management import call_command from django.test import TestCase +from apps.backend.plugin import tools from apps.backend.plugin.tasks import export_plugin, package_task, run_pipeline from apps.backend.tests.plugin import utils from apps.backend.tests.plugin.test_plugin_status_change import TestApiBase @@ -184,7 +185,7 @@ def setUp(self): tfile.add(temp_file_path, arcname=".", recursive=True) # nginx的模拟路径 - settings.NGINX_DOWNLOAD_PATH = self.temp_path + settings.DOWNLOAD_PATH = self.temp_path settings.UPLOAD_PATH = self.upload_path settings.EXPORT_PATH = self.export_path @@ -204,8 +205,8 @@ def test_create_upload_record_and_register(self): """测试创建上传包记录功能""" # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -232,7 +233,9 @@ def test_create_upload_record_and_register(self): # 测试单独注册插件包功能 upload_object = UploadPackage.objects.get(file_name=self.tarfile_name) - package_object_list = upload_object.create_package_records(is_release=True) + package_object_list = tools.create_package_records( + file_path=upload_object.file_path, file_name=upload_object.file_name, is_release=True + ) self.assertEqual( GsePluginDesc.objects.get(name="test_plugin").node_manage_control, @@ -316,8 +319,8 @@ def _test_upload_api_success(self): def _test_register_api_success(self): # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -371,8 +374,8 @@ def test_create_task_register_optional_api(self): self._test_upload_api_success() # 插件包注册后存放地址 - windows_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") - linux_file_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") + windows_file_path = os.path.join(settings.DOWNLOAD_PATH, "windows", "x86", "test_plugin-1.0.1.tgz") + linux_file_path = os.path.join(settings.DOWNLOAD_PATH, "linux", "x86_64", "test_plugin-1.0.1.tgz") # 验证创建前此时文件不存在 self.assertFalse(os.path.exists(linux_file_path)) @@ -384,7 +387,6 @@ def test_create_task_register_optional_api(self): data={ "file_name": self.tarfile_name, "is_release": True, - "is_template_overwrite": True, "is_template_load": True, "select_pkg_abs_paths": ["external_plugins_windows_x86/test_plugin"], "bk_username": "admin", @@ -595,7 +597,7 @@ def test_parse_api_yaml_file_lack_attr_or_category_error(self): self.assertEqual(len(response["data"]), 2) self.assertEqual( - len([item for item in response["data"] if not item["result"] and item["message"] == "project.yaml文件信息缺失"]), + len([item for item in response["data"] if not item["result"] and item["message"] == "project.yaml 文件信息缺失"]), 1, ) self.assertEqual( @@ -603,7 +605,7 @@ def test_parse_api_yaml_file_lack_attr_or_category_error(self): [ item for item in response["data"] - if not item["result"] and item["message"] == "project.yaml中category配置异常,请确认后重试" + if not item["result"] and item["message"] == "project.yaml 中 category 配置异常,请确认后重试" ] ), 1, @@ -648,7 +650,7 @@ def test_parse_api_not_template_and_version_update(self): [ item for item in response["data"] - if not item["result"] and item["message"] == "找不到需要导入的配置模板文件[etc/test_plugin.main.conf.tpl]" + if not item["result"] and item["message"] == "找不到需要导入的配置模板文件 -> etc/test_plugin.main.conf.tpl" ] ), 1, @@ -773,7 +775,7 @@ def setUp(self): tfile.add(temp_file_path, arcname=".", recursive=True) # nginx的模拟路径 - settings.NGINX_DOWNLOAD_PATH = settings.UPLOAD_PATH = self.target_path + settings.DOWNLOAD_PATH = settings.UPLOAD_PATH = self.target_path settings.BK_OFFICIAL_PLUGINS_INIT_PATH = self.temp_path diff --git a/apps/backend/urls.py b/apps/backend/urls.py index 8bc179e4c..1a6f1d20f 100644 --- a/apps/backend/urls.py +++ b/apps/backend/urls.py @@ -13,7 +13,12 @@ from apps.backend import views from apps.backend.healthz.views import HealthzViewSet -from apps.backend.plugin.views import PluginViewSet, export_download, upload_package +from apps.backend.plugin.views import ( + PluginViewSet, + export_download, + upload_package, + upload_package_by_cos, +) from apps.backend.subscription.views import SubscriptionViewSet routers = drf_routers.DefaultRouter(trailing_slash=True) @@ -25,6 +30,7 @@ urlpatterns = [ url(r"api/", include(routers.urls)), url(r"^package/upload/$", upload_package), + url(r"^package/upload_cos/$", upload_package_by_cos), url(r"^export/download/$", export_download, name="export_download"), url(r"^export/", include(export_routers.urls)), url(r"^get_gse_config/", views.get_gse_config), diff --git a/apps/core/__init__.py b/apps/core/__init__.py new file mode 100644 index 000000000..5365eb9da --- /dev/null +++ b/apps/core/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +""" +core 文件夹用于放置 backend / node_man 等模块公用且具有一定代码规模的模块 +区别于utils:放置小型Python函数和类,这类代码用于简化代码逻辑,且与业务无关 +""" diff --git a/apps/core/files/__init__.py b/apps/core/files/__init__.py new file mode 100644 index 000000000..b402ee3b4 --- /dev/null +++ b/apps/core/files/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/apps/core/files/base.py b/apps/core/files/base.py new file mode 100644 index 000000000..60c7f294e --- /dev/null +++ b/apps/core/files/base.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os + +from django.conf import settings +from django.core.exceptions import SuspiciousFileOperation +from django.utils.crypto import get_random_string + + +class StorageFileOverwriteMixin: + + file_overwrite = settings.FILE_OVERWRITE + + def get_available_name(self, name, max_length=None): + """重写获取文件有效名称函数,支持在 file_overwrite=True 时不随机生成文件名""" + + dir_name, file_name = os.path.split(name) + file_root, file_ext = os.path.splitext(file_name) + + def _gen_random_name(_file_root) -> str: + # 在文件名的起始位置添加随机串,源码规则为 "%s_%s%s" % (_file_root, get_random_string(7), file_ext) + # 上述规则对 .tar.gz 不友好,会在类型后缀中间加随机串,所以改为随机串作为前缀 + return os.path.join(dir_name, "%s_%s%s" % (get_random_string(7), _file_root, file_ext)) + + # not self.file_overwrite and self.exists(name) 利用 and 短路特点,如果 file_overwrite=True 就无需校验文件是否存在 + while (not self.file_overwrite and self.exists(name)) or (max_length and len(name) > max_length): + # file_ext includes the dot. + name = name if self.file_overwrite else _gen_random_name(file_root) + + if max_length is None: + continue + # Truncate file_root if max_length exceeded. + truncation = len(name) - max_length + if truncation > 0: + file_root = file_root[:-truncation] + # Entire file_root was truncated in attempt to find an available filename. + if not file_root: + raise SuspiciousFileOperation( + 'Storage can not find an available filename for "%s". ' + "Please make sure that the corresponding file field " + 'allows sufficient "max_length".' % name + ) + name = name if self.file_overwrite else _gen_random_name(file_root) + return name diff --git a/apps/core/files/storage.py b/apps/core/files/storage.py new file mode 100644 index 000000000..274fa7982 --- /dev/null +++ b/apps/core/files/storage.py @@ -0,0 +1,145 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os +from typing import Callable, Dict + +from bkstorages.backends import bkrepo +from django.conf import settings +from django.core.files.storage import FileSystemStorage, Storage, get_storage_class +from django.utils.deconstruct import deconstructible +from django.utils.functional import cached_property + +from apps.core.files.base import StorageFileOverwriteMixin + + +@deconstructible +class CustomBKRepoStorage(StorageFileOverwriteMixin, bkrepo.BKRepoStorage): + + location = getattr(settings, "BKREPO_LOCATION", "") + file_overwrite = getattr(settings, "FILE_OVERWRITE", False) + + endpoint_url = settings.BKREPO_ENDPOINT_URL + username = settings.BKREPO_USERNAME + password = settings.BKREPO_PASSWORD + project_id = settings.BKREPO_PROJECT + bucket = settings.BKREPO_BUCKET + + def __init__( + self, + root_path=None, + username=None, + password=None, + project_id=None, + bucket=None, + endpoint_url=None, + file_overwrite=None, + ): + # 类成员变量应该和构造函数解耦,通过 params or default 的形式给构造参数赋值,防止该类被继承扩展时需要覆盖全部的成员默认值 + root_path = root_path or self.location + username = username or self.username + password = password or self.password + project_id = project_id or self.project_id + bucket = bucket or self.bucket + endpoint_url = endpoint_url or self.endpoint_url + file_overwrite = file_overwrite or self.file_overwrite + super().__init__( + root_path=root_path, + username=username, + password=password, + project_id=project_id, + bucket=bucket, + endpoint_url=endpoint_url, + file_overwrite=file_overwrite, + ) + + +@deconstructible +class AdminFileSystemStorage(StorageFileOverwriteMixin, FileSystemStorage): + + safe_class = FileSystemStorage + OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0) + + def __init__( + self, + location=None, + base_url=None, + file_permissions_mode=None, + directory_permissions_mode=None, + file_overwrite=None, + ): + super().__init__( + location=location, + base_url=base_url, + file_permissions_mode=file_permissions_mode, + directory_permissions_mode=directory_permissions_mode, + ) + + if file_overwrite is not None and isinstance(file_overwrite, bool): + self.file_overwrite = file_overwrite + + # 如果文件允许覆盖,去掉 O_CREAT 配置,存在文件打开时不报错 + if self.file_overwrite: + self.OS_OPEN_FLAGS = os.O_WRONLY | os.O_CREAT | getattr(os, "O_BINARY", 0) + + # 重写 path,将 safe_join 替换为 os.path.join,从而满足往「项目根路径」外读写的需求 + # safe_join 仅允许项目根目录以内的读写,具体参考 -> django.utils._os safe_join + # 本项目的读写控制不存在用户行为,保留safe_mode成员变量,便于切换 + def path(self, name): + return os.path.join(self.location, name) + + @cached_property + def location(self): + """路径指向 / ,重写前路径指向「项目根目录」""" + return self.base_location + + +# 缓存最基础的Storage +_STORAGE_OBJ_CACHE: [str, Storage] = {} + + +def cache_storage_obj(get_storage_func: Callable[[str, Dict], Storage]): + """用于Storage 缓存读写的装饰器""" + + def inner(storage_type: str = settings.STORAGE_TYPE, *args, **construct_params) -> Storage: + # 仅默认参数情况下返回缓存 + if not (construct_params or args) and storage_type in _STORAGE_OBJ_CACHE: + return _STORAGE_OBJ_CACHE[storage_type] + + storage_obj = get_storage_func(storage_type, *args, **construct_params) + + # 仅默认参数情况下写入缓存 + if not (construct_params or args): + _STORAGE_OBJ_CACHE[storage_type] = storage_obj + + return storage_obj + + return inner + + +@cache_storage_obj +def get_storage(storage_type: str = settings.STORAGE_TYPE, safe: bool = False, **construct_params) -> Storage: + """ + 获取 Storage + :param storage_type: 文件存储类型,参考 constants.StorageType + :param safe: 是否启用安全访问,当前项目不存在用户直接指定上传路径的情况,该字段使用默认值即可 + :param construct_params: storage class 构造参数,用于修改storage某些默认行为(写入仓库、base url等) + :return: Storage实例 + """ + storage_import_path = settings.STORAGE_TYPE_IMPORT_PATH_MAP.get(storage_type) + if storage_import_path is None: + raise ValueError(f"please provide valid storage_type {settings.STORAGE_TYPE_IMPORT_PATH_MAP.values()}") + storage_class = get_storage_class(import_path=storage_import_path) + + if safe: + if not hasattr(storage_class, "safe_class"): + raise ValueError(f"please add safe_class to {storage_class.__name__}") + return storage_class.safe_class(**construct_params) + return storage_class(**construct_params) diff --git a/apps/generic.py b/apps/generic.py index 581ebc872..09f9d9257 100644 --- a/apps/generic.py +++ b/apps/generic.py @@ -139,9 +139,17 @@ def custom_exception_handler(exc, context): """ logger.exception(getattr(exc, "message", exc)) request = context["request"] + + if request.method == "GET": + request_params = request.query_params + else: + if "multipart/form-data" in request.headers.get("Content-Type", ""): + request_params = {"files": str(getattr(request, "FILES"))} + else: + request_params = request.data + logger.error( - """捕获未处理异常, 请求URL->[%s], 请求方法->[%s] 请求参数->[%s]""" - % (request.path, request.method, json.dumps(request.query_params if request.method == "GET" else request.data)) + """捕获未处理异常, 请求URL->[%s], 请求方法->[%s] 请求参数->[%s]""" % (request.path, request.method, json.dumps(request_params)) ) # 专门处理 404 异常,直接返回前端,前端处理 if isinstance(exc, Http404): diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index 8438ec430..700633e62 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -14,6 +14,7 @@ import os import re from enum import Enum +from typing import List from django.conf import settings from django.utils.translation import ugettext_lazy as _ @@ -25,16 +26,23 @@ reverse_dict, tuple_choices, ) +from config.default import StorageType # 此值为历史遗留,后续蓝鲸不使用此字段后可废弃 DEFAULT_SUPPLIER_ID = 0 -LINUX_SEP = "/" -WINDOWS_SEP = "\\" - ######################################################################################################## # 任务超时控制 ######################################################################################################## + + +class TimeUnit: + SECOND = 1 + MINUTE = SECOND * 60 + HOUR = MINUTE * 60 + DAY = HOUR * 24 + + TASK_TIMEOUT = 0 # 脚本超时控制在180s=3min TASK_MAX_TIMEOUT = 3600 # 脚本超时控制在180s=3min JOB_MAX_RETRY = 60 # 默认轮询作业最大次数 100次=3min @@ -56,6 +64,35 @@ CC_HOST_FIELDS = ["bk_host_id", "bk_cloud_id", "bk_host_innerip", "bk_host_outerip", "bk_os_type", "bk_os_name"] + +######################################################################################################## +# 字符串常量 +######################################################################################################## + +LINUX_SEP = "/" + +WINDOWS_SEP = "\\" + +# 临时文件存放位置 +TMP_DIR = "/tmp" + +# 临时文件名格式模板 +TMP_FILE_NAME_FORMAT = "nm_tf_{name}" + + +class PluginChildDir(Enum): + EXTERNAL = "external_plugins" + OFFICIAL = "plugins" + + @classmethod + def get_optional_items(cls) -> List[str]: + return [cls.EXTERNAL.value, cls.OFFICIAL.value] + + +PACKAGE_PATH_RE = re.compile( + "(?Pexternal_)?plugins_(?P(linux|windows|aix))_(?P(x86_64|x86|powerpc|aarch64))" +) + ######################################################################################################## # CHOICES ######################################################################################################## @@ -441,10 +478,6 @@ def get_choices(cls): JOB_IP_STATUS_CHOICES = tuple_choices(JOB_IP_STATUS_TUPLE) JobIpStatusType = choices_to_namedtuple(JOB_IP_STATUS_CHOICES) -PACKAGE_PATH_RE = re.compile( - "(?Pexternal_)?plugins_(?P(linux|windows|aix))_(?P(x86_64|x86|powerpc|aarch64))" -) - SYNC_CMDB_HOST_BIZ_BLACKLIST = "SYNC_CMDB_HOST_BIZ_BLACKLIST" # 周期任务相关 @@ -684,11 +717,13 @@ class PolicyRollBackType: ROLLBACK_TYPE__ALIAS_MAP = {SUPPRESSED: "已被其他策略管控", LOSE_CONTROL: "脱离策略管控", TRANSFER_TO_ANOTHER: "转移到优先级最高的策略"} -class TimeUnit: - SECOND = 1 - MINUTE = SECOND * 60 - HOUR = MINUTE * 60 - DAY = HOUR * 24 +class CosBucketEnum(Enum): + PUBLIC = "public" + + PRIVATE = "private" + + +COS_TYPES = [StorageType.BKREPO.value] FILES_TO_PUSH_TO_PROXY = [ diff --git a/apps/node_man/exceptions.py b/apps/node_man/exceptions.py index 96fd6a1c7..ed915c4ae 100644 --- a/apps/node_man/exceptions.py +++ b/apps/node_man/exceptions.py @@ -207,3 +207,9 @@ class PolicyIsRunningError(NodeManBaseException): class InstallChannelNotExistsError(NodeManBaseException): ERROR_CODE = 37 MESSAGE = _("主机的安装通道不存在,请重新选择") + + +class PluginUploadError(NodeManBaseException): + MESSAGE = _("插件上传失败") + MESSAGE_TPL = _("插件上传失败: plugin_name -> {plugin_name}, error -> {error}") + ERROR_CODE = 38 diff --git a/apps/node_man/handlers/plugin_v2.py b/apps/node_man/handlers/plugin_v2.py index 0922d6e0b..606fd331d 100644 --- a/apps/node_man/handlers/plugin_v2.py +++ b/apps/node_man/handlers/plugin_v2.py @@ -11,16 +11,16 @@ import json import os import random -import shutil -import uuid from collections import ChainMap, defaultdict from typing import Any, Dict, List import requests from django.conf import settings from django.core.cache import cache +from django.core.files.uploadedfile import InMemoryUploadedFile from django.utils.translation import ugettext_lazy as _ +from apps.core.files.storage import get_storage from apps.node_man import constants, exceptions, models, tools from apps.node_man.constants import DEFAULT_CLOUD_NAME, IamActionType from apps.node_man.handlers.cmdb import CmdbHandler @@ -28,27 +28,70 @@ from apps.node_man.handlers.iam import IamHandler from apps.utils.basic import distinct_dict_list, list_slice from apps.utils.concurrent import batch_call +from apps.utils.files import md5sum from apps.utils.local import get_request_username from common.api import NodeApi class PluginV2Handler: @staticmethod - def upload(package_file, module, username): - tmp_dir = os.path.join("/tmp/", uuid.uuid4().hex) - os.mkdir(tmp_dir) - tmp_path = os.path.join(tmp_dir, package_file.name) - with open(tmp_path, "wb") as tmp_file: - for chunk in package_file.chunks(): - tmp_file.write(chunk) - md5 = tools.PluginV2Tools.get_file_md5(tmp_path) - with open(tmp_path, "rb") as tf: - response = requests.post( - url=settings.BKAPP_NODEMAN_UPLOAD_URL, - data={"bk_app_code": settings.APP_CODE, "bk_username": username, "module": module, "md5": md5}, - files={"package_file": tf}, - ) - shutil.rmtree(tmp_dir) + def upload(package_file: InMemoryUploadedFile, module: str) -> Dict[str, Any]: + """ + 将文件上传至 + :param package_file: InMemoryUploadedFile + :param module: 所属模块 + :return: + { + "result": True, + "message": "", + "code": "00", + "data": { + "id": record.id, # 上传文件记录ID + "name": record.file_name, # 包名 + "pkg_size": record.file_size, # 大小, + } + } + """ + with package_file.open("rb") as tf: + + # 计算上传文件的md5 + md5 = md5sum(file_obj=tf, closed=False) + + # 构造通用参数 + upload_params = { + "url": settings.DEFAULT_FILE_UPLOAD_API, + "data": { + "bk_app_code": settings.APP_CODE, + "bk_username": get_request_username(), + "module": module, + "md5": md5, + }, + } + + # 如果采用对象存储,文件直接上传至仓库,并将返回的目标路径传到后台,由后台进行校验并创建上传记录 + # TODO 后续应该由前端上传文件并提供md5 + if settings.STORAGE_TYPE in constants.COS_TYPES: + storage = get_storage() + + try: + storage_path = storage.save(name=os.path.join(settings.UPLOAD_PATH, tf.name), content=tf) + except Exception as e: + raise exceptions.PluginUploadError(plugin_name=tf.name, error=e) + + upload_params["data"].update( + { + # 最初文件上传的名称,后台会使用该文件名保存并覆盖同名文件 + "file_name": tf.name, + "file_path": storage_path, + "download_url": storage.url(storage_path), + } + ) + else: + # 本地文件系统仍通过上传文件到Nginx并回调后台 + upload_params["files"] = {"package_file": tf} + + response = requests.post(**upload_params) + return json.loads(response.content) @staticmethod diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 3c6837f8f..f0286c146 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -10,7 +10,6 @@ """ import base64 import copy -import errno import hashlib import json import os @@ -22,17 +21,17 @@ import uuid from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from distutils.dir_util import copy_tree from enum import Enum from functools import cmp_to_key from typing import Any, Dict, List, Optional, Set, Union import requests import six -import yaml from Cryptodome.Cipher import AES from django.conf import settings from django.core.cache import cache -from django.db import models, transaction +from django.db import models from django.db.models import DateTimeField, QuerySet from django.utils import timezone from django.utils.encoding import force_text @@ -45,6 +44,7 @@ from apps.backend.subscription.errors import PipelineExecuteFailed, SubscriptionNotExist from apps.backend.subscription.render_functions import get_hosts_by_node from apps.backend.utils.data_renderer import nested_render_data +from apps.core.files.storage import get_storage from apps.exceptions import ValidationError from apps.node_man import constants from apps.node_man.exceptions import ( @@ -55,8 +55,7 @@ InstallChannelNotExistsError, QueryGlobalSettingsException, ) -from apps.utils import env, orm -from apps.utils.basic import md5 +from apps.utils import files, orm from common.log import logger from pipeline.parser import PipelineParser from pipeline.service import task_service @@ -980,267 +979,7 @@ def proc_control(self): return self._proc_control @classmethod - @transaction.atomic - def create_record( - cls, - dir_path, - package_os, - cpu_arch, - is_external, - creator=None, - is_release=True, - is_template_load=False, - is_template_overwrite=False, - ): - """ - 给定一个插件的路径,分析路径下的project.yaml,生成压缩包到nginx(多台)目录下 - !!!注意:该任务可能会导致长期的卡顿,请务必注意不要再wsgi等单线程环境中调用!!! - :param dir_path: 需要进行打包的插件路径, 例如,plugin_a路径,路径下放置了插件各个文件 - :param package_os: 插件包支持的系统 - :param cpu_arch: 插件支持的CPU架构 - :param is_external: 是否第三方插件 - :param creator: 操作人 - :param is_release: 是否发布的版本 - :param is_template_load: 是否需要读取插件包中的配置模板 - :param is_template_overwrite: 是否可以覆盖已经存在的配置模板 - :return: True | raise Exception - """ - # 1. 判断是否存在project.yaml文件 - project_file_path = os.path.join(dir_path, "project.yaml") - if not os.path.exists(project_file_path): - logger.error("try to pack path->[%s] but is not [project.yaml] file under file path" % dir_path) - raise CreateRecordError(_("找不到 {} project.yaml文件,打包失败").format(dir_path)) - - # 2. 解析project.yaml文件(版本,插件名等信息) - try: - with open(project_file_path, "r", encoding="utf-8") as project_file: - yaml_config = yaml.safe_load(project_file) - - except (IOError, yaml.YAMLError) as error: - logger.error( - "failed to parse or read project_yaml->[{}] for->[{}]".format(project_file_path, traceback.format_exc()) - ) - six.raise_from(error, error) - - try: - # 解析版本号转为字符串,防止x.x情况被解析为浮点型,同时便于后续写入及比较 - yaml_config["version"] = str(yaml_config["version"]) - - package_name = yaml_config["name"] - version = yaml_config["version"] - control_info = yaml_config.get("control", {}) - - except KeyError as error: - logger.error( - "failed to get key info from project.yaml->[%s] for->[%s] maybe config file error?" - % (project_file_path, traceback.format_exc()) - ) - raise CreateRecordError(_("配置文件{}信息缺失,请确认后重试, 缺失字段: {}".format(project_file_path, error))) - - # 判断之前是否已经有发布过的该插件版本 - exists_object_list = cls.objects.filter(project=package_name, version=version, os=package_os, cpu_arch=cpu_arch) - if exists_object_list.filter(is_release_version=True).exists(): - logger.error( - "project->[%s] version->[%s] os->[%s] cpu_arch->[%s] is release, no more operations is " - "allowed." % (package_name, version, package_os, cpu_arch) - ) - - # 判断插件类型是否符合预期 - if yaml_config["category"] not in constants.CATEGORY_TUPLE: - logger.error( - "project->[%s] version->[%s] update(or create) with category->[%s] which is not acceptable, " - "nothing will do." % (package_name, version, yaml_config["category"]) - ) - raise ValueError(_("project.yaml中category配置异常,请确认后重试")) - - # 3. 创建新的插件包信息 - # 判断是否已经由插件描述信息,需要写入 - desc, created = GsePluginDesc.objects.update_or_create( - name=package_name, - defaults=dict( - description=yaml_config.get("description", ""), - scenario=yaml_config.get("scenario", ""), - description_en=yaml_config.get("description_en", ""), - scenario_en=yaml_config.get("scenario_en", ""), - category=yaml_config["category"], - launch_node=yaml_config.get("launch_node", "all"), - config_file=yaml_config.get("config_file", ""), - config_format=yaml_config.get("config_format", ""), - use_db=bool(yaml_config.get("use_db", False)), - auto_launch=bool(yaml_config.get("auto_launch", False)), - is_binary=bool(yaml_config.get("is_binary", True)), - node_manage_control=yaml_config.get("node_manage_control", ""), - ), - ) - if created: - logger.info("desc->[{}] for pack->[{}] is created".format(desc.id, package_name)) - - # 写入插件包信息 - file_name = "{}-{}.tgz".format(package_name, version) - if not exists_object_list.exists(): - # 如果之前未有未发布的插件包信息,需要新建 - record = cls.objects.create( - pkg_name=file_name, - version=version, - module="gse_plugin", - # TODO: 留坑 - creator=creator if creator is not None else settings.SYSTEM_USE_API_ACCOUNT, - project=package_name, - pkg_size=0, - pkg_path="", - md5="", - pkg_mtime="", - pkg_ctime="", - location="", - os=package_os, - cpu_arch=cpu_arch, - is_release_version=is_release, - is_ready=False, - ) - else: - # 否则,更新已有的记录即可 - record = exists_object_list[0] - - path_info = env.get_gse_env_path(package_name, is_windows=(package_os == "windows")) - try: - proc_control = ProcControl.objects.get(plugin_package_id=record.id) - - except ProcControl.DoesNotExist: - proc_control = ProcControl.objects.create( - module="gse_plugin", project=package_name, plugin_package_id=record.id - ) - - # 判断是否需要更新配置文件模板 - if is_template_load: - config_templates = yaml_config.get("config_templates", []) - for templates_info in config_templates: - - # 解析版本号转为字符串,防止x.x情况被解析为浮点型,同时便于后续写入及比较 - templates_info["version"] = str(templates_info["version"]) - templates_info["plugin_version"] = str(templates_info["plugin_version"]) - - is_main_config = templates_info.get("is_main_config", False) - source_path = templates_info["source_path"] - - template_file_path = os.path.join(dir_path, source_path) - if not os.path.exists(template_file_path): - logger.error( - "project.yaml need to import file->[%s] but is not exists, nothing will do." - % templates_info["source_path"] - ) - raise IOError(_("找不到需要导入的配置模板文件[%s]") % source_path) - - template, created = PluginConfigTemplate.objects.update_or_create( - plugin_name=record.project, - plugin_version=templates_info["plugin_version"], - name=templates_info["name"], - version=templates_info["version"], - is_main=is_main_config, - defaults=dict( - format=templates_info["format"], - file_path=templates_info["file_path"], - content=open(template_file_path).read(), - is_release_version=is_release, - creator="system", - create_time=timezone.now(), - source_app_code="bk_nodeman", - ), - ) - - logger.info( - "template->[%s] version->[%s] is create for plugin->[%s] version->[%s] is add" - % (template.name, template.version, record.project, record.version) - ) - - # 由于文件已经进入到了数据库中,此时需要清理tpl文件 - os.remove(template_file_path) - logger.info("template->[%s] now is delete for info has loaded into database." % template_file_path) - - # 更新信息 - proc_control.install_path = path_info["install_path"] - proc_control.log_path = path_info["log_path"] - proc_control.data_path = path_info["data_path"] - proc_control.pid_path = path_info["pid_path"] - proc_control.start_cmd = control_info.get("start", "") - proc_control.stop_cmd = control_info.get("stop", "") - proc_control.restart_cmd = control_info.get("restart", "") - proc_control.reload_cmd = control_info.get("reload", "") - proc_control.kill_cmd = control_info.get("kill", "") - proc_control.version_cmd = control_info.get("version", "") - proc_control.health_cmd = control_info.get("health_check", "") - proc_control.debug_cmd = control_info.get("debug", "") - proc_control.os = package_os - - # 更新插件二进制配置信息,如果不存在默认为空 - proc_control.process_name = yaml_config.get("process_name") - - # 更新是否需要托管 - proc_control.need_delegate = yaml_config.get("need_delegate", True) - - # 更新端口范围信息 - port_range = yaml_config.get("port_range", "") - - # 校验端口范围合法性 - port_range_list = ProcControl.parse_port_range(port_range) - if port_range_list: - proc_control.port_range = port_range - - proc_control.save() - logger.info( - "process control->[%s] for package->[%s] version->[%s] os->[%s] is created." - % (proc_control.id, package_name, version, package_os) - ) - - # 4. 打包创建新的tar包 - file_name = "{}-{}.tgz".format(package_name, version) - temp_file_path = "/tmp/{}-{}-{}-{}.tgz".format(package_name, version, package_os, cpu_arch) - nginx_path = os.path.join(settings.NGINX_DOWNLOAD_PATH, record.os, record.cpu_arch, file_name) - - try: - # 尝试创建 Nginx download path,已存在则忽略 - os.makedirs(os.path.dirname(nginx_path)) - except OSError as e: - if e.errno != errno.EEXIST: - raise e - - with tarfile.open(temp_file_path, "w:gz") as tfile: - tfile.add( - dir_path, - # 判断是否第三方插件的路径 - arcname="external_plugins/%s" % package_name if is_external else "plugins/", - ) - logger.info( - "package->[%s] version->[%s] now is pack to temp_path->[%s], ready to send to nginx." - % (package_name, version, file_name) - ) - - # 4. 文件SCP转移到nginx路径下 - # 注意:此处需要依赖 NGINX_DOWNLOAD_PATH 挂载到 NFS - shutil.copy(temp_file_path, nginx_path) - - # 5. 标记已经完成同步及其他信息 - record.is_ready = True - record.pkg_ctime = record.pkg_mtime = str(timezone.now()) - record.pkg_size = os.path.getsize(temp_file_path) - record.pkg_path = os.path.dirname(nginx_path) - record.md5 = md5(temp_file_path) - # 这里没有加上包名,是因为原本脚本(bkee/bkce)中就没有加上,为了防止已有逻辑异常,保持一致 - # 后面有哪位发现这里不适用了,可以一并修改 - record.location = "http://{}/download/{}/{}".format(os.getenv("LAN_IP"), package_os, cpu_arch) - - record.save() - logger.info( - "plugin->[{}] version->[{}] now is sync to nginx ready to use.".format(record.project, record.version) - ) - - # 清理临时文件 - os.remove(temp_file_path) - logger.info("clean temp tgz file -> [{temp_file_path}] done.".format(temp_file_path=temp_file_path)) - - return record - - @classmethod - def export_plugins(cls, project, version, os_type=None, cpu_arch=None): + def export_plugins(cls, project: str, version: str, os_type: str = None, cpu_arch: str = None) -> Dict[str, str]: """ 导出指定插件 !!! 注意:该方法会有打包及同步等高延迟的动作,请勿在同步环境(uwsgi)下使用 !!! @@ -1259,8 +998,8 @@ def export_plugins(cls, project, version, os_type=None, cpu_arch=None): filter_params["cpu_arch"] = cpu_arch # 1. 确认需要导出的文件 # 注意:未完成发布及nginx准备的插件不可导出 - plugin_list = cls.objects.filter(**filter_params, is_ready=True, is_release_version=True) - if not plugin_list.exists(): + package_objs = cls.objects.filter(**filter_params, is_ready=True, is_release_version=True) + if not package_objs.exists(): logger.error( "user try to export plugin project->[{project}] version->[{version}] " "filter_params->[{filter_params}] but is not exists, nothing will do.".format( @@ -1270,142 +1009,110 @@ def export_plugins(cls, project, version, os_type=None, cpu_arch=None): raise ValueError(_("找不到可导出插件,请确认后重试")) # 临时的解压目录 - temp_path = "/tmp/%s" % uuid.uuid4().hex - # 临时的压缩包路径 - temp_file_path = "/tmp/%s.tgz" % uuid.uuid4().hex + local_unzip_target_dir = files.mk_and_return_tmpdir() + # 暂存导出插件的文件路径 + export_plugin_tmp_path = os.path.join( + constants.TMP_DIR, constants.TMP_FILE_NAME_FORMAT.format(name=f"{uuid.uuid4().hex}.tgz") + ) # 2. 各个插件解压到指定的目录 - for plugin in plugin_list: - plugin.unzip(temp_path) + for package_obj in package_objs: + package_obj.unzip(local_unzip_target_dir) logger.info( - "plugin->[{}] os->[{}] cpu->[{}] unzip success.".format(plugin.pkg_name, plugin.os, plugin.cpu_arch) + "package -> {pkg_name} os -> {os} cpu -> {cpu_arch} unzip success.".format( + pkg_name=package_obj.pkg_name, os=package_obj.os, cpu_arch=package_obj.cpu_arch + ) ) - # 3. 解压的指定目录打包 - with tarfile.open(temp_file_path, "w:gz") as tar_file: + # 3. 将解压的各个插件包打包成一个完整的插件 + with tarfile.open(export_plugin_tmp_path, "w:gz") as tar_file: # temp_path下的内容由于是从plugin处解压获得,所以应该已经符合external_plugins或者plugins的目录规范 # 此处则不再指定 - tar_file.add(temp_path, ".") + tar_file.add(local_unzip_target_dir, ".") logger.debug( - "export plugin->[%s] version->[%s] create temp_file->[%s] from path->[%s] success, " - "ready to trans to nginx." % (project, version, temp_file_path, temp_path) + "export plugin -> {plugin_name} version -> {version} create export tmp path -> {export_plugin_tmp_path} " + "from path-> {local_unzip_target_dir} success, ready to storage".format( + plugin_name=project, + version=version, + export_plugin_tmp_path=export_plugin_tmp_path, + local_unzip_target_dir=local_unzip_target_dir, + ) ) - # 4. 同步到nginx指定目录 - file_name = "{}-{}-{}.tgz".format(project, version, md5(temp_file_path)) - - if not os.path.exists(settings.EXPORT_PATH): - os.makedirs(settings.EXPORT_PATH) + # 4. 将导出的插件上传到存储源 + plugin_export_target_path = os.path.join( + settings.EXPORT_PATH, f"{project}-{version}-{files.md5sum(name=export_plugin_tmp_path)}.tgz" + ) + with open(export_plugin_tmp_path, mode="rb") as tf: + storage_path = get_storage().save(plugin_export_target_path, tf) - download_file_path = os.path.join(settings.EXPORT_PATH, file_name) - shutil.copy(temp_file_path, download_file_path) logger.info( - "plugin->[{}] version->[{}] export file->[{}] is ready".format(project, version, download_file_path) + "export done: plugin-> {plugin_name} version -> {version} export file -> {storage_path}".format( + plugin_name=project, version=version, storage_path=storage_path + ) ) - logger.info("plugin->[{}] version->[{}] export job success.".format(project, version)) - # 清除临时文件 - shutil.rmtree(temp_path) - os.remove(temp_file_path) + os.remove(export_plugin_tmp_path) + shutil.rmtree(local_unzip_target_dir) + logger.info( - "clean temp tgz file -> [{temp_file_path}], temp path -> [{temp_path}] done.".format( - temp_file_path=temp_file_path, temp_path=temp_path + "plugin -> {plugin_name} version -> {version} export job success.".format( + plugin_name=project, version=version ) ) - return {"file_path": download_file_path} + return {"file_path": storage_path} - def unzip(self, target_path): + def unzip(self, local_target_dir: str) -> None: """ 将一个指定的插件解压到指定的目录下 - :param target_path: 指定的解压目录 + :param local_target_dir: 指定的解压目录 :return: True | raise Exception """ + storage = get_storage() file_path = os.path.join(self.pkg_path, self.pkg_name) - # 1. 获取文件 - if not os.path.exists(file_path): + # 校验插件包是否存在 + if not storage.exists(file_path): logger.error( - "try to unzip package->[{}] but file_path->[{}] is not exists, nothing will do.".format( - self.pkg_name, file_path + "try to unzip package-> {pkg_name} but file_path -> {file_path} is not exists, nothing will do.".format( + pkg_name=self.pkg_name, file_path=file_path ) ) - raise ValueError(_("插件文件不存在,请联系管理员处理")) - - # 2. 解压到指定的目录 - with tarfile.open(file_path) as tar_file: - - file_members = tar_file.getmembers() - - # 判断获取需要解压到的目标位置 - if "external_plugins" in file_members[0].name: - # 第三方插件的导出 - # 目标路径变更为:${target_path}/external_plugins_linux_x86/${project_name}/ - target_path = os.path.join( - target_path, "external_plugins_{}_{}".format(self.os, self.cpu_arch), self.project - ) - logger.info( - "project->[%s] version->[%s] is external_plugins so set target_path->[%s]" - % (self.project, self.version, target_path) - ) - plugin_root_path = "external_plugins/%s/" % self.project - type_root_path = "external_plugins/" - - else: - # 目标路径变更为:${target_path}/plugins_linux_x86/${project_name}/ - target_path = os.path.join(target_path, "plugins_{}_{}".format(self.os, self.cpu_arch), self.project) - logger.info( - "project->[%s] version->[%s] is offical plugins so set target_path->[%s]" - % (self.project, self.version, target_path) - ) - plugin_root_path = "plugins/%s/" % self.project - type_root_path = "plugins/" - - if not os.path.exists(target_path): - os.makedirs(target_path) - logger.info("temp path->[{}] for package->[{}] is created".format(target_path, self.pkg_name)) - - # 对所有的内容进行遍历,然后找到是文件的内容,解压到我们的目标路径上 - for member in file_members: - - # 如果是类型的层级文件夹,跳过 - if member.name == plugin_root_path[:-1] or member.name == type_root_path[:-1]: - logger.info( - "path->[{}] plugin_root_path->[{}] type_root_path->[{}] jump it".format( - member.name, plugin_root_path, type_root_path - ) - ) - continue + raise ValueError(_("插件包不存在,请联系管理员处理")) + + # 将插件包解压到临时目录中 + package_tmp_dir = files.mk_and_return_tmpdir() + # 文件的读取是从指定数据源(NFS或对象存储),可切换源模式,不直接使用原生open + with storage.open(name=file_path, mode="rb") as tf_from_storage: + with tarfile.open(fileobj=tf_from_storage) as tf: + tf.extractall(path=package_tmp_dir) + + # 遍历插件包的一级目录,找出 PluginExternalTypePrefix 匹配的文件夹并加入到指定的解压目录 + # 一般来说,插件包是具体到机器操作系统类型的,所以 package_tmp_dir 下基本只有一个目录 + for external_type_prefix in os.listdir(package_tmp_dir): + if external_type_prefix not in constants.PluginChildDir.get_optional_items(): + continue + # 将匹配的目录拷贝并格式化命名 + # 关于拷贝目录,参考:https://stackoverflow.com/questions/1868714/ + copy_tree( + src=os.path.join(package_tmp_dir, external_type_prefix), + dst=os.path.join(local_target_dir, f"{external_type_prefix}_{self.os}_{self.cpu_arch}"), + ) - # 解压时,只关注最底层的文件名及文件夹 - # 上层的external_plugins/project_name废弃 - file_name = member.name.replace(plugin_root_path, "") - logger.info( - "path->[{}] is extract to->[{}] with replace_root->[{}]".format( - member.name, file_name, plugin_root_path - ) - ) - current_target_path = os.path.join(target_path, file_name) - - # 此处使用私有方法,是因为改名没有其他方式了 - # 如果其他大锅有更好的方案,欢迎修改。。。囧 - tar_file._extract_member(member, current_target_path) - logger.info( - "project->[%s] version->[%s] file->[%s] is extract to->[%s]" - % (self.project, self.version, member.name, current_target_path) - ) + # 移除临时解压目录 + shutil.rmtree(package_tmp_dir) logger.info( - "package->[{}] os->[{}] cpu->[{}] unzip to path->[{}] success.".format( - self.pkg_name, self.os, self.cpu_arch, target_path + "package-> {pkg_name} os -> {os} cpu_arch -> {cpu_arch} unzip to " + "path -> {local_target_dir} success.".format( + pkg_name=self.pkg_name, os=self.os, cpu_arch=self.cpu_arch, local_target_dir=local_target_dir ) ) - return True - class Meta: verbose_name = _("模块/工程安装包信息表") verbose_name_plural = _("模块/工程安装包信息表") @@ -1550,210 +1257,69 @@ def create_record(cls, module, file_path, md5, operator, source_app_code, file_n """ 创建一个新的上传记录 :param module: 文件模块 - :param file_path: 文件在机器上的本地路径 + :param file_path: 文件源路径 :param md5: 文件MD5 :param operator: 操作者 :param source_app_code: 上传来源APP_CODE - :param file_name: 文件上传前的名字 + :param file_name: 期望的文件保存名,在非文件覆盖的情况下,该名称不是文件最终的保存名 :param is_file_copy: 是否复制而非剪切文件,适应初始化内置插件需要使用 :return: upload record """ # 注意:MD5参数值将会直接使用,因为服务器上的MD5是由nginx协助计算,应该在views限制 - # 1. 判断文件是否已经存在 - if not os.path.exists(file_path): - logger.warning( - "user->[{}] try to create record for file->[{}] but is not exists.".format(operator, file_path) - ) - raise CreateRecordError(_("文件{file_path}不存在,请确认后重试").format(file_path=file_path)) + try: + storage = get_storage() + # 判断文件是否已经存在 + if not storage.exists(file_path): + logger.warning( + "operator -> {operator} try to create record for file -> {file_path} but is not exists.".format( + operator=operator, file_path=file_path + ) + ) + raise CreateRecordError(_("文件{file_path}不存在,请确认后重试").format(file_path=file_path)) - # 判断上传文件的路径是否已经存在 - if not os.path.exists(settings.UPLOAD_PATH): - os.makedirs(settings.UPLOAD_PATH) - logger.info("path->[{}] is not exists, and now is created by us.".format(settings.UPLOAD_PATH)) + target_file_path = os.path.join(settings.UPLOAD_PATH, file_name) - # 3. 文件迁移到public - new_file_path = os.path.join(settings.UPLOAD_PATH, file_name) + # 如果读写路径一致无需拷贝文件,此处 target_file_path / file_path 属于同个文件源 + if target_file_path != file_path: + with storage.open(name=file_path, mode="rb") as fs: + # 不允许覆盖同名文件的情况下,文件名会添加随机串,此时 target_file_path / file_name 应刷新 + target_file_path = storage.save(name=target_file_path, content=fs) + file_name = os.path.basename(target_file_path) - try: - if is_file_copy: - shutil.copy(file_path, new_file_path) - else: - shutil.move(file_path, new_file_path) - except IOError: + # 如果是通过 mv 拷贝到指定目录,此时原文件应该删除 + if not is_file_copy: + storage.delete(file_path) + + record = cls.objects.create( + file_name=file_name, + module=module, + file_path=target_file_path, + file_size=storage.size(target_file_path), + md5=md5, + upload_time=timezone.now(), + creator=operator, + source_app_code=source_app_code, + ) + + except Exception: logger.error( - "failed to mv source_file->[%s] to targe_path->[%s] for->[%s]" - % (file_path, new_file_path, traceback.format_exc()) + "failed to mv source_file -> {file_path} to target_file_path -> {target_file_path}, " + "err_msg -> {err_msg}".format( + file_path=file_path, target_file_path=target_file_path, err_msg=traceback.format_exc() + ) ) raise CreateRecordError(_("文件迁移失败,请联系管理员协助处理")) - record = cls.objects.create( - file_name=file_name, - module=module, - file_path=new_file_path, - file_size=os.path.getsize(new_file_path), - md5=md5, - upload_time=timezone.now(), - creator=operator, - source_app_code=source_app_code, - ) logger.info( - "new record for file->[%s] module->[%s] is added by operator->[%s] from system->[%s]." - % (file_path, module, operator, source_app_code) + "new record for file -> {file_path} module -> {module} is added by operator -> {operator} " + "from system -> {source_app_code}.".format( + file_path=file_path, module=module, operator=operator, source_app_code=source_app_code + ) ) return record - def create_package_records( - self, is_release, creator=None, select_pkg_abs_paths=None, is_template_load=False, is_template_overwrite=False - ): - """ - 拆解一个上传包并将里面的插件录入到package表中 - :param is_release: 是否正式发布 - :param creator: 操作人 - :param select_pkg_abs_paths: 指定注册包名列表 - :param is_template_load: 是否需要读取配置文件 - :param is_template_overwrite: 是否可以覆盖已经存在的配置文件 - :return: [package_object, ...] - """ - # 1. 解压压缩文件 - package_result = [] - temp_path = "/tmp/%s" % uuid.uuid4().hex - - with tarfile.open(self.file_path) as tfile: - # 检查是否存在可疑内容 - for file_info in tfile.getmembers(): - if file_info.name.startswith("/") or "../" in file_info.name: - logger.error( - "WTF? file->[{}] contains member->[{}] try to escape! We won't use it.".format( - self.file_path, file_info.name - ) - ) - raise CreateRecordError(_("文件包含非法路径成员[{name}],请检查").format(name=file_info.name)) - - logger.info("file->[{}] extract to path->[{}] success.".format(self.file_path, temp_path)) - tfile.extractall(path=temp_path) - - # 2. 遍历第一层的内容,得知当前的操作系统和cpu架构信息 - with transaction.atomic(): - for first_path in os.listdir(temp_path): - re_match = constants.PACKAGE_PATH_RE.match(first_path) - if re_match is None: - logger.info("path->[%s] is not match re, jump it." % first_path) - continue - - path_dict = re_match.groupdict() - current_os = path_dict["os"] - cpu_arch = path_dict["cpu_arch"] - logger.info("path->[{}] is match for os->[{}] cpu->[{}]".format(first_path, current_os, cpu_arch)) - - # 遍历第二层的内容,得知当前的插件名 - abs_first_path = os.path.join(temp_path, first_path) - for second_path in os.listdir(abs_first_path): - # 注册新的内容,并触发同步 - # second_path 是包名 - abs_path = os.path.join(abs_first_path, second_path) - - if not os.path.isdir(abs_path): - logger.info("found file path->[%s] jump it" % abs_path) - continue - if select_pkg_abs_paths is not None and f"{first_path}/{second_path}" not in select_pkg_abs_paths: - logger.info("path->[%s] not select, jump it" % abs_path) - continue - record = Packages.create_record( - dir_path=abs_path, - package_os=current_os, - cpu_arch=cpu_arch, - is_release=is_release, - creator=creator, - is_external=path_dict["is_external"] is not None, - is_template_load=is_template_load, - is_template_overwrite=is_template_overwrite, - ) - - logger.info("package->[{}] now add record->[{}] success.".format(self.file_name, record.id)) - package_result.append(record) - - # 3. 完成 - logger.info("now package->[%s] is all add done." % self.file_name) - - # 4. 清理临时文件夹 - shutil.rmtree(temp_path) - logger.info("clean temp path -> [{temp_path}] done.".format(temp_path=temp_path)) - return package_result - - def list_package_infos(self): - """ - 解析`self.file_path`下插件,获取包信息字典列表 - :return: [ - { - # 插件包的相对路径 - "pkg_abs_path": "plugins_linux_x86_64/package_name" - # 插件包目录路径 - "dir_path": "/tmp/12134/plugins_linux_x86_64/package_name", - # 插件所需操作系统 - "package_os": "linux", - # 支持cpu位数 - "cpu_arch": "x86_64", - # 是否为自定义插件 - "is_external": "False" - }, - ... - ] - """ - # 1. 解压压缩文件 - temp_path = "/tmp/%s" % uuid.uuid4().hex - with tarfile.open(self.file_path) as tfile: - # 检查是否存在可疑内容 - for file_info in tfile.getmembers(): - if file_info.name.startswith("/") or "../" in file_info.name: - logger.error( - "WTF? file->[{}] contains member->[{}] try to escape! We won't use it.".format( - self.file_path, file_info.name - ) - ) - raise ValueError(_("文件包含非法路径成员[%s],请检查") % file_info.name) - logger.info("file->[{}] extract to path->[{}] success.".format(self.file_path, temp_path)) - tfile.extractall(path=temp_path) - - package_infos = [] - # 遍历第一层的内容,获取操作系统和cpu架构信息,eg:external(可无,有表示自定义插件)_plugins_linux_x86_64 - for first_plugin_dir_name in os.listdir(temp_path): - # 通过正则提取出插件(plugin)目录名中的插件信息 - re_match = constants.PACKAGE_PATH_RE.match(first_plugin_dir_name) - if re_match is None: - logger.info("path->[%s] is not match re, jump it." % first_plugin_dir_name) - continue - - # 将文件名解析为插件信息字典 - plugin_info_dict = re_match.groupdict() - current_os = plugin_info_dict["os"] - cpu_arch = plugin_info_dict["cpu_arch"] - logger.info( - "path->[{}] is match for os->[{}] cpu->[{}]".format(first_plugin_dir_name, current_os, cpu_arch) - ) - - first_level_plugin_path = os.path.join(temp_path, first_plugin_dir_name) - # 遍历第二层的内容,获取包名, eg:plugins_linux_x86_64/package_name - for second_package_dir_name in os.listdir(first_level_plugin_path): - # 拼接获取包路径 - second_level_package_dir_path = os.path.join(first_level_plugin_path, second_package_dir_name) - if not os.path.isdir(second_level_package_dir_path): - logger.info("found file path->[%s] jump it" % second_level_package_dir_path) - continue - - package_infos.append( - { - "pkg_abs_path": f"{first_plugin_dir_name}/{second_package_dir_name}", - "dir_path": second_level_package_dir_path, - "package_os": current_os, - "cpu_arch": cpu_arch, - "is_external": plugin_info_dict["is_external"] is not None, - } - ) - - return package_infos - class DownloadRecord(models.Model): """ @@ -1825,7 +1391,7 @@ def download_key(self): return md5.hexdigest() @classmethod - def create_record(cls, category, query_params, creator, source_app_code): + def create_record(cls, category: str, query_params: Dict[str, Any], creator: str, source_app_code: str): """ 创建下载任务记录 :param category: 下载文件类型 @@ -1835,13 +1401,6 @@ def create_record(cls, category, query_params, creator, source_app_code): :return: download record """ - if category not in cls.CATEGORY_TASK_DICT: - logger.error( - "user->[%s] from source_app->[%s] request category->[%s] is not supported now, " - "nothing will do." % (creator, source_app_code, category) - ) - raise ValueError(_("请求下载类型[%s]暂不支持,请确认后重试") % category) - record = cls.objects.create( category=category, query_params=json.dumps(query_params), @@ -1853,8 +1412,9 @@ def create_record(cls, category, query_params, creator, source_app_code): source_app_code=source_app_code, ) logger.info( - "download record->[{}] is create from app->[{}] for category->[{}] query_params->[{}]".format( - record.id, source_app_code, category, query_params + "download record -> {record_id} is create from app -> {source_app_code} for category -> {category} " + "query_params -> {query_params}".format( + record_id=record.id, source_app_code=source_app_code, category=category, query_params=query_params ) ) @@ -1879,10 +1439,14 @@ def execute(self): self.file_path = result["file_path"] except Exception as error: - logger.error("failed to execute task->[{}] for->[{}]".format(self.id, traceback.format_exc())) + logger.error( + "failed to execute task -> {record_id} for -> {err_msg}".format( + record_id=self.id, err_msg=traceback.format_exc() + ) + ) task_status = self.TASK_STATUS_FAILED - error_message = _("任务失败: %s") % error + error_message = _("任务失败: {err_msg}").format(err_msg=error) six.raise_from(error, error) @@ -1892,8 +1456,9 @@ def execute(self): self.finish_time = timezone.now() self.save() logger.info( - "task->[%s] is done with status->[%s] error_message->[%s]" - % (self.id, self.task_status, self.error_message) + "task -> {record_id} is done with status -> {task_status} error_message -> {err_msg}".format( + record_id=self.id, task_status=self.task_status, err_msg=self.error_message + ) ) diff --git a/apps/node_man/tests/test_pluginv2.py b/apps/node_man/tests/test_pluginv2.py index 4dc70c7de..c83645ccc 100644 --- a/apps/node_man/tests/test_pluginv2.py +++ b/apps/node_man/tests/test_pluginv2.py @@ -67,30 +67,6 @@ def test_list_plugin(self): }, ) - @patch("apps.node_man.handlers.plugin_v2.requests.post", upload_package_return) - def test_upload(self): - class package_file: - name = "123.txt" - - @classmethod - def chunks(cls): - return [b"test1", b"test2"] - - result = PluginV2Handler().upload(package_file=package_file, module="test_module", username="") - self.assertEqual( - result, - { - "result": True, - "message": "", - "code": "00", - "data": { - "id": 21, # 包上传记录ID - "name": "test-0.01.tgz", # 包名 - "pkg_size": "23412434", # 单位byte - }, - }, - ) - @patch("apps.node_man.handlers.cmdb.CmdbHandler.cmdb_or_cache_biz", cmdb_or_cache_biz) def test_list_plugin_host(self): # 构造数据 diff --git a/apps/node_man/tools/plugin_v2.py b/apps/node_man/tools/plugin_v2.py index d231293b7..f456316f7 100644 --- a/apps/node_man/tools/plugin_v2.py +++ b/apps/node_man/tools/plugin_v2.py @@ -9,7 +9,6 @@ specific language governing permissions and limitations under the License. """ -import hashlib import re import traceback from itertools import groupby @@ -29,18 +28,6 @@ class PluginV2Tools: lower_var_path_pattern = re.compile(r"{{\s*[\w.]+\s*\|\s*lower\s*}}") lower_var_name_pattern = re.compile(r"{{\s*([\w.]+)\s*\|\s*lower\s*}}") - @staticmethod - def get_file_md5(file_name): - hash_md5 = hashlib.md5() - try: - with open(file_name, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - except IOError: - return "-1" - - return hash_md5.hexdigest() - @classmethod def shield_tpl_unparse_content(cls, config_template_content: str): shield_content = config_template_content diff --git a/apps/node_man/views/plugin_v2.py b/apps/node_man/views/plugin_v2.py index 23a27ef04..ef8df9b58 100644 --- a/apps/node_man/views/plugin_v2.py +++ b/apps/node_man/views/plugin_v2.py @@ -458,11 +458,7 @@ def upload(self, request): ser = self.serializer_class(data=request.data) ser.is_valid(raise_exception=True) data = ser.validated_data - return JsonResponse( - PluginV2Handler.upload( - package_file=data["package_file"], module=data["module"], username=get_request_username() - ) - ) + return JsonResponse(PluginV2Handler.upload(package_file=data["package_file"], module=data["module"])) @action(detail=False, methods=["POST"], serializer_class=plugin_v2.PluginFetchConfigVarsSerializer) def fetch_config_variables(self, request): diff --git a/apps/utils/basic.py b/apps/utils/basic.py index 375c02481..89a2da70f 100644 --- a/apps/utils/basic.py +++ b/apps/utils/basic.py @@ -8,7 +8,6 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import hashlib from collections import Counter, namedtuple from copy import deepcopy from typing import Any, Dict, Iterable, List, Set, Union @@ -72,21 +71,6 @@ def suffix_slash(os, path): return path -def md5(file_name): - """内部实现的平台无关性计算MD5""" - hash = hashlib.md5() - try: - with open(file_name, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - if not chunk: - break - hash.update(chunk) - except IOError: - return "-1" - - return hash.hexdigest() - - def chunk_lists(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): diff --git a/apps/utils/env.py b/apps/utils/env.py index bda8ea42c..659b21cd7 100644 --- a/apps/utils/env.py +++ b/apps/utils/env.py @@ -11,7 +11,7 @@ import logging import os -from typing import Any +from typing import Any, Dict from django.conf import settings @@ -49,10 +49,10 @@ def get_env_list(env_prefix): return result -def get_gse_env_path(package_name, is_windows=False): +def get_gse_env_path(plugin_name: str, is_windows=False) -> Dict[str, str]: """ 获取gse agent的路径信息 - :param package_name: 插件名,因为部分文件配置路径与插件名有关 + :param plugin_name: 插件名,因为部分文件配置路径与插件名有关 :param is_windows: 是否windows环境下的配置 :return: { "install_path": "/usr/local", @@ -66,7 +66,7 @@ def get_gse_env_path(package_name, is_windows=False): return { "install_path": settings.GSE_WIN_AGENT_HOME, "log_path": settings.GSE_WIN_AGENT_LOG_DIR, - "pid_path": settings.GSE_WIN_AGENT_RUN_DIR + "\\" + package_name + ".pid", + "pid_path": settings.GSE_WIN_AGENT_RUN_DIR + "\\" + plugin_name + ".pid", "data_path": settings.GSE_WIN_AGENT_DATA_DIR, } # linux & aix系统下的配置 @@ -74,7 +74,7 @@ def get_gse_env_path(package_name, is_windows=False): return { "install_path": settings.GSE_AGENT_HOME, "log_path": settings.GSE_AGENT_LOG_DIR, - "pid_path": settings.GSE_AGENT_RUN_DIR + "/" + package_name + ".pid", + "pid_path": settings.GSE_AGENT_RUN_DIR + "/" + plugin_name + ".pid", "data_path": settings.GSE_AGENT_DATA_DIR, } diff --git a/apps/utils/files.py b/apps/utils/files.py new file mode 100644 index 000000000..8939bf8e1 --- /dev/null +++ b/apps/utils/files.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import hashlib +import os +import uuid +from typing import IO, Any, Optional + +import requests + +from apps.node_man import constants + + +class FileOpen: + + """ + 文件上下文管理器 + 与open对比,提供 file_obj - IO 作为文件输入的方式 + 读取结束时,指针默认复位,避免影响 file_obj 复用,减少 open 次数 + """ + + closed: bool + # 标记是否通过路径打开 + is_name: bool + file_obj: Optional[IO[Any]] + + def __init__(self, name: str = None, file_obj: Optional[IO[Any]] = None, mode: str = "rb", closed: bool = True): + """ + :param name: 文件路径 + :param file_obj: 已 open 的文件,优先级:file_obj > name + :param mode: 文件访问模式,同 open mode + :param closed: 上下文结束时是否关闭 + """ + if not (file_obj or name): + raise ValueError("nothing to open") + + self.closed = closed + # 标记是否通过路径打开 + self.is_name = not file_obj + + if self.is_name: + self.file_obj = open(name, mode=mode) + else: + self.file_obj = file_obj + + def __enter__(self) -> IO[Any]: + return self.file_obj + + def __exit__(self, exc_type, exc_val, exc_tb): + + # 指针复位,避免 closed=False 场景下,影响上层逻辑对该文件对象的复用 + # 参考:https://stackoverflow.com/questions/3906137/why-cant-i-call-read-twice-on-an-open-file + self.file_obj.seek(0) + + # 通过路径open的文件对象必须关闭 + # 传入的文件对象由上层逻辑决定是否显式传入不关闭 + if self.is_name or (not self.is_name and self.closed): + self.file_obj.close() + + +def md5sum(name: str = None, file_obj: Optional[IO[Any]] = None, mode: str = "rb", closed: bool = True) -> str: + """ + 计算文件md5 + :param name: 文件路径 + :param file_obj: 已打开的文件文件对象,同时传 name 和 file_obj 后者优先使用 + :param mode: 文件打开模式,具体参考 open docstring,默认 rb + :param closed: 是否返回时关闭文件对象,安全起见默认关闭 + :return: md5 str or "-1" + """ + + hash_md5 = hashlib.md5() + + with FileOpen(name=name, file_obj=file_obj, mode=mode, closed=closed) as fs: + for chunk in iter(lambda: fs.read(4096), b""): + if not chunk: + continue + hash_md5.update(chunk) + + return hash_md5.hexdigest() + + +def download_file( + url: str, + name: str = None, + file_obj: Optional[IO[Any]] = None, + mode: str = "wb", + closed: bool = True, +) -> None: + + """ + 下载文件 + :param url: 下载url + :param name: 写入目标路径 + :param file_obj: 已打开的写入目标文件对象 + :param mode: 文件打开模式,具体参考 open docstring,默认 rb + :param closed: 是否返回时关闭文件对象,安全起见默认关闭 + :return: None + """ + + with requests.get(url=url, stream=True) as rfs: + + rfs.raise_for_status() + + with FileOpen(name=name, file_obj=file_obj, mode=mode, closed=closed) as local_fs: + for chunk in rfs.iter_content(chunk_size=4096): + if not chunk: + continue + local_fs.write(chunk) + + +def mk_and_return_tmpdir() -> str: + """ + 创建并返回临时目录 + :return: + """ + tmp_dir = os.path.join(constants.TMP_DIR, constants.TMP_FILE_NAME_FORMAT.format(name=uuid.uuid4().hex)) + os.makedirs(tmp_dir, exist_ok=True) + return tmp_dir diff --git a/apps/utils/tests/test_env.py b/apps/utils/tests/test_env.py new file mode 100644 index 000000000..26665da39 --- /dev/null +++ b/apps/utils/tests/test_env.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import os +import uuid + +from apps.utils import env +from apps.utils.unittest.testcase import CustomBaseTestCase + + +class TestEnv(CustomBaseTestCase): + def test_get_type_env(self): + def _test_bool_type(): + _cases = [ + {"key": uuid.uuid4().hex, "value": "True", "except_value": True}, + {"key": uuid.uuid4().hex, "value": "False", "except_value": False}, + {"key": uuid.uuid4().hex, "value": "false", "except_value": False}, + {"key": uuid.uuid4().hex, "value": "true", "except_value": True}, + ] + for _case in _cases: + os.environ[_case["key"]] = _case["value"] + self.assertEqual(env.get_type_env(key=_case["key"], _type=bool), _case["except_value"]) + + os.environ.pop(_case["key"]) + + _not_bool_key = uuid.uuid4().hex + os.environ[_not_bool_key] = "not bool" + self.assertRaises(ValueError, env.get_type_env, key=_not_bool_key, _type=bool) + os.environ.pop(_not_bool_key) + + def _test_int_type(): + _numbers = [0, -1, 1, 1234567890] + for _number in _numbers: + _key = uuid.uuid4().hex + os.environ[_key] = str(_number) + self.assertEqual(env.get_type_env(key=_key, _type=int), _number) + os.environ.pop(_key) + + def _test_str_type(): + _strings = ["1", "2"] + for _string in _strings: + _key = uuid.uuid4().hex + os.environ[_key] = str(_string) + self.assertEqual(env.get_type_env(key=_key, _type=str), _string) + os.environ.pop(_key) + + def _test_get_default(): + _default_values = [10, False, "TYPE"] + for _default_value in _default_values: + self.assertEqual(env.get_type_env(key=uuid.uuid4().hex, default=_default_value), _default_value) + + _test_bool_type() + _test_int_type() + _test_str_type() + _test_get_default() diff --git a/config/default.py b/config/default.py index 63714d80f..89012f7a5 100644 --- a/config/default.py +++ b/config/default.py @@ -10,6 +10,7 @@ """ import sys from distutils.util import strtobool +from enum import Enum from blueapps.conf.default_settings import * # noqa @@ -287,6 +288,76 @@ CACHES["default"] = CACHES["db"] + +# ============================================================================== +# 文件存储 +# ============================================================================== + + +class StorageType(Enum): + """文件存储类型""" + + # 本地文件系统 + FILE_SYSTEM = "FILE_SYSTEM" + + # 制品库 + BKREPO = "BKREPO" + + +# 用于控制默认的文件存储类型 +# 更多类型参考 apps.node_man.constants.STORAGE_TYPE +STORAGE_TYPE = os.getenv("STORAGE_TYPE", StorageType.FILE_SYSTEM.value) + +# 是否覆盖同名文件 +FILE_OVERWRITE = get_type_env("FILE_OVERWRITE", _type=bool, default=False) + +# 节点管理后台外网域名,用于构造文件导入导出的API URL +BACKEND_HOST = os.getenv("BKAPP_BACKEND_HOST", "") + +BKREPO_USERNAME = os.getenv("BKREPO_USERNAME") +BKREPO_PASSWORD = os.getenv("BKREPO_PASSWORD") +BKREPO_PROJECT = os.getenv("BKREPO_PROJECT") +# 默认文件存放仓库 +BKREPO_BUCKET = os.getenv("BKREPO_BUCKET") +# 对象存储平台域名 +BKREPO_ENDPOINT_URL = os.getenv("BKREPO_ENDPOINT_URL") + +# 存储类型 - storage class 映射关系 +STORAGE_TYPE_IMPORT_PATH_MAP = { + StorageType.FILE_SYSTEM.value: "apps.core.files.storage.AdminFileSystemStorage", + StorageType.BKREPO.value: "apps.core.files.storage.CustomBKRepoStorage", +} + +# 默认的file storage +DEFAULT_FILE_STORAGE = STORAGE_TYPE_IMPORT_PATH_MAP[STORAGE_TYPE] + +# 本地文件系统上传文件后台API +FILE_SYSTEM_UPLOAD_API = f"{BACKEND_HOST}/backend/package/upload/" + +# 对象存储上传文件后台API +COS_UPLOAD_API = f"{BACKEND_HOST}/backend/package/upload_cos/" + +# 暂时存在多个上传API的原因:原有文件上传接口被Nginx转发 +STORAGE_TYPE_UPLOAD_API_MAP = { + StorageType.FILE_SYSTEM.value: FILE_SYSTEM_UPLOAD_API, + StorageType.BKREPO.value: COS_UPLOAD_API, +} + +DEFAULT_FILE_UPLOAD_API = STORAGE_TYPE_UPLOAD_API_MAP[STORAGE_TYPE] + +BKAPP_NODEMAN_DOWNLOAD_API = f"{BACKEND_HOST}/backend/export/download/" + +PUBLIC_PATH = os.getenv("BKAPP_PUBLIC_PATH") or "/data/bkee/public/bknodeman/" + +# NGINX miniweb路径 +DOWNLOAD_PATH = os.path.join(PUBLIC_PATH, "download") + +# 上传文件的保存位置 +UPLOAD_PATH = os.path.join(PUBLIC_PATH, "upload") + +# 下载文件路径 +EXPORT_PATH = os.path.join(PUBLIC_PATH, "export") + # ============================================================================== # 后台配置 # ============================================================================== @@ -453,32 +524,12 @@ } LOGGING["loggers"]["iam"] = {"handlers": ["iam"], "level": LOGGING["loggers"]["root"]["level"], "propagate": True} -PUBLIC_PATH = os.getenv("BKAPP_PUBLIC_PATH") or "/data/bkee/public/bknodeman/" - -# 上传文件的保存位置 -UPLOAD_PATH = os.path.join(PUBLIC_PATH, "upload") - -# 下载文件路径 -EXPORT_PATH = os.path.join(PUBLIC_PATH, "export") - -# NGINX miniweb路径 -NGINX_DOWNLOAD_PATH = os.path.join(PUBLIC_PATH, "download") - # 节点管理后台 LAN_IP BKAPP_LAN_IP = os.getenv("LAN_IP") # 节点管理后台 NFS_IP BKAPP_NFS_IP = os.getenv("NFS_IP") or BKAPP_LAN_IP -# 节点管理后台外网域名 -# TODO: 需要部署侧提供 -BACKEND_HOST = os.getenv("BKAPP_BACKEND_HOST", "") - -# 文件上传接口 -BKAPP_NODEMAN_UPLOAD_URL = f"{BACKEND_HOST}/backend/package/upload/" - -BKAPP_NODEMAN_DOWNLOAD_URL = f"{BACKEND_HOST}/backend/export/download/" - # 节点管理回调地址 BKAPP_NODEMAN_CALLBACK_URL = os.getenv("BKAPP_NODEMAN_CALLBACK_URL", "") BKAPP_NODEMAN_OUTER_CALLBACK_URL = os.getenv("BKAPP_NODEMAN_OUTER_CALLBACK_URL", "") @@ -508,6 +559,7 @@ VERSION_LOG = {"MD_FILES_DIR": os.path.join(PROJECT_ROOT, "release")} + # remove disabled apps if locals().get("DISABLED_APPS"): INSTALLED_APPS = locals().get("INSTALLED_APPS", []) diff --git a/dev_log/2.1.341/crayon_202108241642.yaml b/dev_log/2.1.341/crayon_202108241642.yaml new file mode 100644 index 000000000..473223ed8 --- /dev/null +++ b/dev_log/2.1.341/crayon_202108241642.yaml @@ -0,0 +1,2 @@ +feature: + - "插件包管理支持对象存储模式 (#2)" diff --git a/requirements.txt b/requirements.txt index 6a02a0c1e..ccceb1231 100644 --- a/requirements.txt +++ b/requirements.txt @@ -61,3 +61,5 @@ prettytable==2.1.0 raven==6.1.0 # for apm ddtrace==0.14.1 + +bkstorages==1.0.1