Skip to content

Commit

Permalink
feature: 插件包管理支持对象存储模式,公共方法优化整合 (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon authored and zhangzhw8 committed Sep 22, 2021
1 parent 04ff2b8 commit e548072
Show file tree
Hide file tree
Showing 37 changed files with 1,471 additions and 825 deletions.
10 changes: 6 additions & 4 deletions apps/backend/agent/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def push_files_to_proxy(self, file):
type=Var.PLAIN,
value=[{"ip": self.host_info["bk_host_innerip"], "bk_cloud_id": self.host_info["bk_cloud_id"]}],
)
act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.NGINX_DOWNLOAD_PATH)
act.component.inputs.file_target_path = Var(type=Var.PLAIN, value=settings.DOWNLOAD_PATH)
act.component.inputs.files = Var(type=Var.PLAIN, value=file["files"])
act.component.inputs.from_type = Var(type=Var.PLAIN, value=file.get("from_type", ""))
act.component.inputs.context = Var(type=Var.PLAIN, value="")
Expand All @@ -370,7 +370,7 @@ def start_nginx(self):
with open(path, encoding="utf-8") as fh:
script = fh.read()
script_content = script % {
"nginx_path": settings.NGINX_DOWNLOAD_PATH,
"nginx_path": settings.DOWNLOAD_PATH,
"bk_nodeman_nginx_download_port": settings.BK_NODEMAN_NGINX_DOWNLOAD_PORT,
"bk_nodeman_nginx_proxy_pass_port": settings.BK_NODEMAN_NGINX_PROXY_PASS_PORT,
}
Expand Down Expand Up @@ -451,14 +451,16 @@ def _operate_process(

setup_path = path_handler.join(
package.proc_control.install_path,
"external_plugins",
const.PluginChildDir.EXTERNAL.value,
group_id,
package.project,
)
pid_path_prefix, pid_filename = path_handler.split(package.proc_control.pid_path)
pid_path = path_handler.join(pid_path_prefix, group_id, pid_filename)
else:
setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin")
setup_path = path_handler.join(
package.proc_control.install_path, const.PluginChildDir.OFFICIAL.value, "bin"
)
pid_path = package.proc_control.pid_path

act = AgentServiceActivity(
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/agent/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def gen_commands(host: models.Host, pipeline_id: str, is_uninstall: bool) -> Ins
dest_dir = host.agent_config["temp_path"]
dest_dir = suffix_slash(host.os_type.lower(), dest_dir)
if script_file_name == constants.SetupScriptFileName.SETUP_PAGENT_PY.value:
run_cmd_params.append(f"-L {settings.NGINX_DOWNLOAD_PATH}")
run_cmd_params.append(f"-L {settings.DOWNLOAD_PATH}")
# 云区域自动安装
upstream_nodes = [proxy.inner_ip for proxy in host.proxies]
host.upstream_nodes = proxies
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/components/collections/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def execute(self, data, parent_data):
self.logger.info(_("开始下发升级包"))
host_info = data.get_one_of_inputs("host_info")
host = Host.get_by_host_info(host_info)
nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH
nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH
data.inputs.file_target_path = host.agent_config["temp_path"]

os_type = host.os_type.lower()
Expand Down Expand Up @@ -1269,7 +1269,7 @@ def _execute(self, data, parent_data):
else:
path_handler = posixpath

setup_path = path_handler.join(package.proc_control.install_path, "plugins", "bin")
setup_path = path_handler.join(package.proc_control.install_path, const.PluginChildDir.OFFICIAL.value, "bin")
pid_path = package.proc_control.pid_path

result = gse_client.register_process(hosts, control, setup_path, pid_path, plugin_name, plugin_name)
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/components/collections/bulk_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def execute(self, data, parent_data):
self.logger.info(_("开始下发升级包"))
host_info = data.get_one_of_inputs("host_info")
host = models.Host.get_by_host_info(host_info)
nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH
nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH
data.inputs.file_target_path = host.agent_config["temp_path"]

os_type = host.os_type.lower()
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/components/collections/bulk_job_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def execute(self, data, parent_data):
self.logger.info(_("开始下发升级包"))
host_info = data.get_one_of_inputs("host_info")
host = models.Host.get_by_host_info(host_info)
nginx_path = host.ap.nginx_path or settings.NGINX_DOWNLOAD_PATH
nginx_path = host.ap.nginx_path or settings.DOWNLOAD_PATH
data.inputs.file_target_path = host.agent_config["temp_path"]

os_type = host.os_type.lower()
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def execute(self, data, parent_data):

data.inputs.file_source = [
{
"files": [f"{settings.NGINX_DOWNLOAD_PATH}/{file}" for file in files],
"files": [f"{settings.DOWNLOAD_PATH}/{file}" for file in files],
"account": "root",
"ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}],
}
Expand Down
8 changes: 5 additions & 3 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ def get_plugins_paths(
pid_filename = f"{plugin_name}.pid"
if package.plugin_desc.category == constants.CategoryType.external:
# 如果为 external 第三方插件,需要补上插件组目录
setup_path = path_handler.join(ap_config["setup_path"], "external_plugins", group_id, package.project)
setup_path = path_handler.join(
ap_config["setup_path"], constants.PluginChildDir.EXTERNAL.value, group_id, package.project
)

if subscription.category == subscription.CategoryType.DEBUG:
# debug模式下特殊处理这些路径
Expand All @@ -289,7 +291,7 @@ def get_plugins_paths(
log_path = path_handler.join(ap_config["log_path"], group_id)
data_path = path_handler.join(ap_config["data_path"], group_id)
else:
setup_path = path_handler.join(ap_config["setup_path"], "plugins", "bin")
setup_path = path_handler.join(ap_config["setup_path"], constants.PluginChildDir.OFFICIAL.value, "bin")
pid_path = path_handler.join(ap_config["run_path"], pid_filename)
log_path = ap_config["log_path"]
data_path = ap_config["data_path"]
Expand Down Expand Up @@ -386,7 +388,7 @@ def _execute(self, data, parent_data, common_data: CommonData):
# 如 linux-arm、linux-x86、windows-x86 的插件,需分为三组
# 把多个IP合并为一个任务,可以利用GSE文件管道的BT能力,提高传输效率
jobs: Dict[str, Dict[str, Union[list, str]]] = defaultdict(lambda: defaultdict(list))
nginx_path = settings.NGINX_DOWNLOAD_PATH
nginx_path = settings.DOWNLOAD_PATH
for process_status in process_statuses:
bk_host_id = process_status.bk_host_id
subscription_instance = group_id_instance_map.get(process_status.group_id)
Expand Down
10 changes: 10 additions & 0 deletions apps/backend/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,13 @@ class GenCommandsError(BackendBaseException):
class GseEncryptedError(BackendBaseException):
MESSAGE = _("GSE敏感信息加密失败")
ERROR_CODE = 8


class PluginParseError(BackendBaseException):
MESSAGE = _("插件解析错误")
ERROR_CODE = 9


class CreatePackageRecordError(BackendBaseException):
MESSAGE = _("归档插件包信息错误")
ERROR_CODE = 10
2 changes: 1 addition & 1 deletion apps/backend/management/commands/copy_file_to_nginx.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def handle(self, *args, **options):
# 接入点配置的nginx路径
nginx_paths = [ap.nginx_path for ap in AccessPoint.objects.all() if ap.nginx_path]
# 默认nginx路径
nginx_paths.append(settings.NGINX_DOWNLOAD_PATH)
nginx_paths.append(settings.DOWNLOAD_PATH)
# 去重
nginx_paths = list(set(nginx_paths))
for _path in os.listdir(settings.BK_SCRIPTS_PATH):
Expand Down
10 changes: 6 additions & 4 deletions apps/backend/management/commands/init_official_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
from django.core.management.base import BaseCommand
from django.db.transaction import atomic

from apps.backend.plugin import tools
from apps.node_man import models
from apps.utils.basic import md5
from apps.utils.files import md5sum
from common.log import logger


Expand Down Expand Up @@ -59,7 +60,7 @@ def handle(self, *args, **options):
# 后续可以考虑通过路径来判断
module="gse_plugin",
file_path=file_abs_path,
md5=md5(file_abs_path),
md5=md5sum(name=file_abs_path),
operator="system",
source_app_code="bk_nodeman",
file_name=file_name,
Expand All @@ -68,10 +69,11 @@ def handle(self, *args, **options):

try:
# 如果是官方内置的插件,那么应该是直接发布的
package_list = upload_record.create_package_records(
package_list = tools.create_package_records(
file_path=upload_record.file_path,
file_name=upload_record.file_name,
is_release=True,
is_template_load=True,
is_template_overwrite=True,
)
except Exception as error:
# 但是需要注意这个文件可能是已经存在的文件,会有导入失败的问题
Expand Down
58 changes: 45 additions & 13 deletions apps/backend/plugin/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
import base64
import hashlib

from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
from rest_framework.exceptions import ValidationError

from apps.node_man import constants
from apps.node_man.models import GsePluginDesc, Packages
from apps.node_man.models import DownloadRecord, GsePluginDesc, Packages


class GatewaySerializer(serializers.Serializer):
Expand Down Expand Up @@ -202,14 +203,28 @@ def validate(self, attrs):
return attrs


class UploadInfoSerializer(GatewaySerializer):
class UploadInfoBaseSerializer(GatewaySerializer):
md5 = serializers.CharField(help_text=_("上传端计算的文件md5"), max_length=32)
file_name = serializers.CharField(help_text=_("上传端提供的文件名"), min_length=1)
module = serializers.CharField(max_length=32, required=False, default="gse_plugin")


class UploadInfoSerializer(UploadInfoBaseSerializer):
"""上传插件包接口序列化器"""

module = serializers.CharField(max_length=32, required=False, default="gse_plugin")
md5 = serializers.CharField(max_length=32)
file_name = serializers.CharField()
file_local_path = serializers.CharField(max_length=512)
file_local_md5 = serializers.CharField(max_length=32)
file_local_path = serializers.CharField(help_text=_("本地文件路径"), max_length=512)
file_local_md5 = serializers.CharField(help_text=_("Nginx所计算的文件md5"), max_length=32)


class CosUploadInfoSerializer(UploadInfoBaseSerializer):
download_url = serializers.URLField(help_text=_("对象存储文件下载url"), required=False)
file_path = serializers.CharField(help_text=_("文件保存路径"), min_length=1, required=False)

def validate(self, attrs):
# 两种参数模式少要有一种满足
if not ("download_url" in attrs or "file_path" in attrs):
raise ValidationError("at least has download_url or file_path")
return attrs


class PluginStartDebugSerializer(GatewaySerializer):
Expand Down Expand Up @@ -238,17 +253,25 @@ def validate(self, attrs):


class PluginRegisterSerializer(GatewaySerializer):
file_name = serializers.CharField()
is_release = serializers.BooleanField()
file_name = serializers.CharField(help_text=_("文件名称"))
is_release = serializers.BooleanField(help_text=_("是否立即发布该插件"))

# 两个配置文件相关参数选填,兼容监控
# 是否需要读取配置文件
is_template_load = serializers.BooleanField(required=False, default=False)
# 是否可以覆盖已经存在的配置文件
is_template_overwrite = serializers.BooleanField(required=False, default=False)
is_template_load = serializers.BooleanField(help_text=_("是否需要读取配置文件"), required=False, default=False)
is_template_overwrite = serializers.BooleanField(help_text=_("是否可以覆盖已经存在的配置文件"), required=False, default=False)

# TODO 废弃字段,改用 select_pkg_relative_paths,待与前端联调后移除该字段
select_pkg_abs_paths = serializers.ListField(required=False, min_length=1, child=serializers.CharField())

select_pkg_relative_paths = serializers.ListField(
required=False, min_length=1, child=serializers.CharField(), help_text=_("选择注册的插件包相对路径,缺省默认全选")
)

def validate(self, attrs):
attrs["select_pkg_relative_paths"] = attrs.get("select_pkg_abs_paths")

return attrs


class PluginRegisterTaskSerializer(GatewaySerializer):
job_id = serializers.IntegerField()
Expand Down Expand Up @@ -287,6 +310,15 @@ def validate(self, data):
creator = serializers.CharField()
bk_app_code = serializers.CharField()

def validate(self, attrs):
if attrs["category"] not in DownloadRecord.CATEGORY_TASK_DICT:
raise ValidationError(
"请求下载类型 -> {category} 暂不支持,可选项 -> {choices}".format(
category=attrs["category"], choices=DownloadRecord.CATEGORY_CHOICES
)
)
return attrs


class DeletePluginSerializer(GatewaySerializer):
name = serializers.CharField()
Expand Down
14 changes: 8 additions & 6 deletions apps/backend/plugin/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from django.utils.translation import ugettext as _

from apps.backend.celery import app
from apps.backend.plugin import tools
from apps.backend.utils.pipeline_parser import PipelineParser as CustomPipelineParser
from apps.node_man import constants as const
from apps.node_man import models
Expand All @@ -41,23 +42,24 @@ def package_task(job_id, task_params):
job = models.Job.objects.get(id=job_id, job_type=const.JobType.PACKING_PLUGIN)

except models.Job.DoesNotExist:
logger.error("try to execute job->[%s] but is not exists")
logger.error("try to execute job-> {job_id} but is not exists".format(job_id=job_id))
return False

try:
file_name = task_params["file_name"]
is_release = task_params["is_release"]
select_pkg_abs_paths = task_params.get("select_pkg_abs_paths")
select_pkg_relative_paths = task_params.get("select_pkg_relative_paths")
# 使用最后的一条上传记录
upload_package_object = models.UploadPackage.objects.filter(file_name=file_name).order_by("-upload_time")[0]

# 2. 执行任务
upload_package_object.create_package_records(
tools.create_package_records(
file_path=upload_package_object.file_path,
file_name=upload_package_object.file_name,
is_release=is_release,
creator=task_params["bk_username"],
select_pkg_abs_paths=select_pkg_abs_paths,
select_pkg_relative_paths=select_pkg_relative_paths,
is_template_load=task_params.get("is_template_load", False),
is_template_overwrite=task_params.get("is_template_overwrite", False),
)

except PermissionError:
Expand Down Expand Up @@ -85,7 +87,7 @@ def package_task(job_id, task_params):
job.save()

if job.status == const.JobStatusType.SUCCESS:
logger.info("task->[%s] has finish all job." % job.id)
logger.info("task -> {job_id} has finish all job.".format(job_id=job.id))


@app.task(queue="backend")
Expand Down
Loading

0 comments on commit e548072

Please sign in to comment.