Skip to content

Commit

Permalink
feature: 更新Proxy文件兼容制品库 (close TencentBlueKing#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW committed Oct 8, 2021
1 parent 9e70f62 commit 4f64ed7
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 49 deletions.
5 changes: 3 additions & 2 deletions apps/backend/management/commands/update_proxy_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@


from django.core.management.base import BaseCommand
from apps.node_man.periodic_tasks import update_proxy_file

from apps.node_man.periodic_tasks import update_proxy_files


class Command(BaseCommand):
def handle(self, *args, **options):
update_proxy_file()
update_proxy_files()
11 changes: 11 additions & 0 deletions apps/backend/tests/components/collections/agent/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ def __init__(
self.job.get_job_instance_status = MagicMock(return_value=get_job_instance_status_return)


class JobDemandMock:
def __init__(self, poll_task_result_return=None):
self.poll_task_result = MagicMock(return_value=poll_task_result_return)


class StorageMock:
def __init__(self, get_file_md5_return=None, fast_transfer_file_return=None):
self.get_file_md5 = MagicMock(return_value=get_file_md5_return)
self.fast_transfer_file = MagicMock(return_value=fast_transfer_file_return)


class AgentTestObjFactory:
@classmethod
def replace_obj_attr_values(cls, obj, obj_attr_values):
Expand Down
3 changes: 2 additions & 1 deletion apps/backend/tests/components/collections/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ def __init__(
class JobV3MockApi:
fast_transfer_file_return = JOB_EXECUTE_TASK_RETURN["data"]

def __init__(self, fast_transfer_file_return: Dict = None):
def __init__(self, fast_transfer_file_return: Dict = None, fast_execute_script_return: Dict = None):
self.fast_transfer_file = MagicMock(return_value=fast_transfer_file_return or self.fast_transfer_file_return)
self.fast_execute_script = MagicMock(return_value=fast_execute_script_return)
121 changes: 121 additions & 0 deletions apps/backend/tests/periodic_tasks/test_update_proxy_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import os
import random
import shutil
from typing import Optional

import mock
from django.conf import settings
from django.core.management import call_command
from mock import patch

from apps.backend.tests.components.collections.agent import utils
from apps.backend.tests.components.collections.job import utils as job_utils
from apps.core.files import constants as core_const
from apps.node_man import constants
from apps.node_man.tests.utils import create_cloud_area, create_host
from apps.utils import files
from apps.utils.files import md5sum
from apps.utils.unittest.testcase import CustomBaseTestCase

FAST_EXECUTE_SCRIPT = {
"job_instance_name": "API Quick execution script1521100521303",
"job_instance_id": 10000,
"step_instance_id": 10001,
}


DOWNLOAD_PATH: Optional[str] = None
MD5SUM = {}

GET_AGENT_STATUS = {
f"{constants.DEFAULT_CLOUD}:{utils.TEST_IP}": {
"ip": utils.TEST_IP,
"bk_cloud_id": constants.DEFAULT_CLOUD,
"bk_agent_alive": constants.BkAgentStatus.ALIVE,
}
}

POLL_RESULT = {
"is_finished": True,
"task_result": {
"success": [
{"ip": utils.TEST_IP, "bk_cloud_id": constants.DEFAULT_CLOUD, "log_content": '{"7z.exe": "1sd124bsdbsdb1"}'}
],
"pending": [],
"failed": [],
},
}

Job_Mock_Client = job_utils.JobV3MockApi(
fast_execute_script_return=FAST_EXECUTE_SCRIPT,
)

Gse_Mock_Client = utils.GseMockClient(
get_agent_status_return=GET_AGENT_STATUS,
)

Storage_Mock_Client = utils.StorageMock(get_file_md5_return=MD5SUM, fast_transfer_file_return=10001)

Job_Demand_Mock_Client = utils.JobDemandMock(poll_task_result_return=POLL_RESULT)

OVERWRITE_OBJ__KV_MAP = {
settings: {
"DOWNLOAD_PATH": "/tmp",
"STORAGE_TYPE": "FILE_SYSTEM",
"BKREPO_BUCKET": "bucket",
"BKREPO_PROJECT": "project",
}
}


class TestUpdateProxyFile(CustomBaseTestCase):
download_files = [c for f in constants.FILES_TO_PUSH_TO_PROXY for c in f["files"]]

def setUp(self) -> None:
create_cloud_area(number=5, creator="admin")
alive_number, unknown_number = 1, 1
host_to_create, _, _ = create_host(number=unknown_number, node_type="PROXY", proc_type="UNKNOWN")
create_host(number=alive_number, node_type="PROXY", bk_host_id=20)
patch("apps.node_man.periodic_tasks.update_proxy_file.client_v2", Gse_Mock_Client).start()
patch("apps.node_man.periodic_tasks.update_proxy_file.JobApi", Job_Mock_Client).start()
patch("apps.node_man.periodic_tasks.update_proxy_file.JobDemand", Job_Demand_Mock_Client).start()
patch("apps.node_man.periodic_tasks.update_proxy_file.get_storage", mock.MagicMock(Storage_Mock_Client)).start()

def test_file_system_update(self):
self.DOWNLOAD_PATH = files.mk_and_return_tmpdir()
OVERWRITE_OBJ__KV_MAP[settings]["DOWNLOAD_PATH"] = self.DOWNLOAD_PATH
with self.settings(
DOWNLOAD_PATH=OVERWRITE_OBJ__KV_MAP[settings]["DOWNLOAD_PATH"],
STORAGE_TYPE=core_const.StorageType.FILE_SYSTEM.value,
):
for file in self.download_files:
size = random.randint(99, 2000)
with open(os.path.join(settings.DOWNLOAD_PATH, file), "wb") as e:
e.write(os.urandom(size))
# 存在差异,同步文件
self.assertIsNone(call_command("update_proxy_file"))
for file in self.download_files:
MD5SUM.update({file: md5sum(os.path.join(settings.DOWNLOAD_PATH, file))})
# 不存在差异
self.assertIsNone(call_command("update_proxy_file"))
shutil.rmtree(settings.DOWNLOAD_PATH)

def test_blueking_artifactory_update(self):
with self.settings(
DOWNLOAD_PATH=OVERWRITE_OBJ__KV_MAP[settings]["DOWNLOAD_PATH"],
STORAGE_TYPE=core_const.StorageType.BLUEKING_ARTIFACTORY.value,
BKREPO_PROJECT=OVERWRITE_OBJ__KV_MAP[settings]["BKREPO_PROJECT"],
BKREPO_BUCKET=OVERWRITE_OBJ__KV_MAP[settings]["BKREPO_BUCKET"],
):
self.assertIsNone(call_command("update_proxy_file"))
5 changes: 5 additions & 0 deletions apps/core/files/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ class FilesRegisterFileSourceError(CoreBaseException):
class FilesTransferError(CoreBaseException):
MESSAGE = _("文件传输失败")
ERROR_CODE = 4


class FilesNotFoundError(CoreBaseException):
MESSAGE = _("文件不存在")
ERROR_CODE = 5
15 changes: 15 additions & 0 deletions apps/core/files/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from django.utils.functional import cached_property

from apps.utils.basic import filter_values
from apps.utils.files import md5sum

from . import constants
from .base import BaseStorage
from .exceptions import FilesNotFoundError
from .file_source import BkJobFileSourceManager


Expand Down Expand Up @@ -72,6 +74,13 @@ def __init__(
def path(self, name):
raise NotImplementedError()

def get_file_md5(self, file_name: str) -> str:
if not self.exists(name=file_name):
raise FilesNotFoundError(f"{self.project_id}/{self.bucket}/{file_name} not exist.")
file_metadata = self.get_file_metadata(key=file_name)
file_md5 = file_metadata["X-Checksum-Md5"]
return file_md5

def _handle_file_source_list(
self, file_source_list: List[Dict[str, Any]], extra_transfer_file_params: Dict[str, Any]
) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -139,6 +148,12 @@ def __init__(
def path(self, name):
return os.path.join(self.location, name)

def get_file_md5(self, file_name: str) -> str:
if not os.path.isfile(file_name):
raise FileExistsError(f"{file_name} not exist.")
file_md5 = md5sum(file_name)
return file_md5

@cached_property
def location(self):
"""路径指向 / ,重写前路径指向「项目根目录」"""
Expand Down
2 changes: 1 addition & 1 deletion apps/node_man/periodic_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .sync_cmdb_cloud_area import sync_cmdb_cloud_area # noqa
from .sync_cmdb_host import sync_cmdb_host # noqa
from .sync_proc_status_task import sync_proc_status_task # noqa
from .update_proxy_file import update_proxy_file # noqa
from .update_proxy_file import update_proxy_files # noqa

if getattr(settings, "CONFIG_POLICY_BY_TENCENT_VPC", False):
from .configuration_policy import configuration_policy # noqa
98 changes: 53 additions & 45 deletions apps/node_man/periodic_tasks/update_proxy_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from django.conf import settings

from apps.backend.api.errors import JobPollTimeout
from apps.backend.api.job import JobClient
from apps.component.esbclient import client_v2
from apps.node_man import constants as const
from apps.core.files.exceptions import FilesNotFoundError
from apps.core.files.storage import get_storage
from apps.node_man import constants
from apps.node_man.models import Host
from apps.utils.files import md5sum
from apps.node_man.periodic_tasks.utils import JobDemand
from common.api import JobApi
from common.log import logger


Expand All @@ -31,37 +33,42 @@
options={"queue": "default"},
run_every=crontab(hour="1", minute="0", day_of_week="*", day_of_month="*", month_of_year="*"),
)
def update_proxy_file():
def update_proxy_files():
proxy_hosts = [
{
"ip": host.inner_ip,
"bk_cloud_id": host.bk_cloud_id,
}
for host in Host.objects.filter(node_type=const.NodeType.PROXY)
{"ip": host["inner_ip"], "bk_cloud_id": host["bk_cloud_id"]}
for host in Host.objects.filter(node_type=constants.NodeType.PROXY).values("inner_ip", "bk_cloud_id")
]
if not proxy_hosts:
return

# 实时查询PROXY状态
agent_status_data = client_v2.gse.get_agent_status({"hosts": proxy_hosts})
for key, host_info in agent_status_data.items():
if host_info["bk_agent_alive"] != const.BkAgentStatus.ALIVE:
if host_info["bk_agent_alive"] != constants.BkAgentStatus.ALIVE:
proxy_hosts.remove({"ip": host_info["ip"], "bk_cloud_id": host_info["bk_cloud_id"]})
if not proxy_hosts:
return
file_name = [file for key in const.FILES_TO_PUSH_TO_PROXY for file in key["files"]]
file_paths = [file for key in constants.FILES_TO_PUSH_TO_PROXY for file in key["files"]]
local_file_md5 = {}
for download_file in file_name:
storage = get_storage()
for download_file in file_paths:
file_path = os.path.join(settings.DOWNLOAD_PATH, download_file)
if os.path.exists(file_path):
local_file_md5.update({download_file: md5sum(file_path)})
else:
try:
local_file_md5.update({download_file: storage.get_file_md5(file_path)})
except FileExistsError:
logger.warning(
f"File ->[{download_file}] not found in download path ->[{settings.DOWNLOAD_PATH}], "
f"please check in order not to affect proxy installation."
)
except FilesNotFoundError:
logger.warning(
f"File ->[{download_file}] not found in path ->"
f"[PROJECT:{settings.BKREPO_PROJECT}, BUCKET: {settings.BKREPO_BUCKET}, PATH: {settings.DOWNLOAD_PATH}]"
f"please check in order not to affect proxy installation."
)

if not local_file_md5:
return
return True

files = [file for file in local_file_md5.keys()]
script = """#!/opt/py36/bin/python
Expand Down Expand Up @@ -99,48 +106,49 @@ def md5(file_name):
"bk_biz_id": settings.BLUEKING_BIZ_ID,
"script_content": base64.b64encode(script.encode()).decode(),
"script_timeout": 300,
"account": const.LINUX_ACCOUNT,
"account_alias": constants.LINUX_ACCOUNT,
"is_param_sensitive": 1,
"script_type": 4,
"script_language": 4,
"target_server": {"ip_list": proxy_hosts},
}

data = client_v2.job.fast_execute_script(kwargs, bk_username=settings.SYSTEM_USE_API_ACCOUNT)
job_instance_id = data["job_instance_id"]
job_instance_id = JobApi.fast_execute_script(kwargs)["job_instance_id"]
time.sleep(5)
client = JobClient(
bk_biz_id=settings.BLUEKING_BIZ_ID, username=settings.SYSTEM_USE_API_ACCOUNT, os_type=const.OsType.LINUX
)
is_finished, task_result = client.poll_task_result(job_instance_id)
if not is_finished or not task_result["success"]:
result = JobDemand.poll_task_result(job_instance_id)
task_result = result["task_result"]
if not result["is_finished"] or not task_result["success"]:
logger.error(f"get proxy files md5 by job failed, msg: {task_result}")
raise Exception(f"get proxy files md5 by job failed, msg: {task_result}")
ip_list = []
for result in task_result["success"]:
proxy_md5 = json.loads(result["log_content"])
for host_result in task_result["success"]:
proxy_md5 = json.loads(host_result["log_content"])
for name, file_md5 in local_file_md5.items():
if name not in proxy_md5 or proxy_md5[name] != file_md5:
ip_list.append({"ip": result["ip"], "bk_cloud_id": result["bk_cloud_id"]})
ip_list.append({"ip": host_result["ip"], "bk_cloud_id": host_result["bk_cloud_id"]})
if not ip_list:
logger.info("There are no files with local differences on all proxy servers")
return
client = JobClient(
bk_biz_id=settings.BLUEKING_BIZ_ID, username=settings.SYSTEM_USE_API_ACCOUNT, os_type=const.OsType.LINUX
)
file_source = [
{
"files": [os.path.join(settings.DOWNLOAD_PATH, file) for file in files],
"account": const.LINUX_ACCOUNT,
"ip_list": [{"ip": settings.BKAPP_LAN_IP, "bk_cloud_id": 0}],
}
]
job_instance_id = client.fast_push_file(
ip_list=ip_list, file_target_path=settings.DOWNLOAD_PATH, file_source=file_source
)

job_transfer_id = storage.fast_transfer_file(
bk_biz_id=settings.BLUEKING_BIZ_ID,
task_name=f"NODEMAN_PUSH_FILE_TO_PROXY_{len(ip_list)}",
timeout=300,
account_alias=constants.LINUX_ACCOUNT,
file_target_path=settings.DOWNLOAD_PATH,
file_source_list=[{"file_list": [os.path.join(settings.DOWNLOAD_PATH, file) for file in files]}],
target_server={"ip_list": ip_list},
)
time.sleep(5)
try:
is_finished, result = client.poll_task_result(job_instance_id)
transfer_result = JobDemand.poll_task_result(job_transfer_id)
if transfer_result["is_finished"]:
logger.info(
f"proxy update file success, hosts: "
f"{transfer_result['task_result']['success']}, job_instance_id:{job_transfer_id}"
)
except JobPollTimeout:
logger.error(f"proxy update file failed. job_instance_id:{job_instance_id}")
if is_finished:
logger.info(f"proxy update file success. job_instance_id:{job_instance_id}")
logger.error(
f"proxy update file failed, pending hosts: "
f"{transfer_result['task_result']['pending']}, failed hosts: {transfer_result['task_result']['failed']}"
f"job_instance_id:{job_transfer_id}"
)
Loading

0 comments on commit 4f64ed7

Please sign in to comment.