Skip to content

Commit

Permalink
feature: 接入点改造 (closed TencentBlueKing#852)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW committed Mar 4, 2024
1 parent 3879585 commit 7f36781
Show file tree
Hide file tree
Showing 13 changed files with 1,047 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def get_default_case_name(cls) -> str:
def setUpTestData(cls):
super().setUpTestData()
host = models.Host.objects.all()[0]
models.AccessPoint.objects.all().update(btfileserver=[{"inner_ip": host.inner_ip, "outer_ip": host.outer_ip}])
models.AccessPoint.objects.all().update(
btfileserver={"inner_ip_infos": [{"ip": host.inner_ip}], "outer_ip_infos": [{"ip": host.outer_ip}]}
)
host_data = copy.deepcopy(common_unit.host.HOST_MODEL_DATA)
host_data.update({"bk_host_id": cls.obj_factory.RANDOM_BEGIN_HOST_ID + cls.obj_factory.total_host_num})
cls.obj_factory.bulk_create_model(models.Host, [host_data])
Expand Down
15 changes: 12 additions & 3 deletions apps/mock_data/common_unit/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@
"ap_type": "system",
"region_id": "test",
"city_id": "test",
"btfileserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"dataserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"taskserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"btfileserver": {
"inner_ip_infos": [{"ip": DEFAULT_IP}],
"outer_ip_infos": [{"ip": DEFAULT_IP}],
},
"dataserver": {
"inner_ip_infos": [{"ip": DEFAULT_IP}],
"outer_ip_infos": [{"ip": DEFAULT_IP}],
},
"taskserver": {
"inner_ip_infos": [{"ip": DEFAULT_IP}],
"outer_ip_infos": [{"ip": DEFAULT_IP}],
},
"zk_hosts": [{"zk_ip": DEFAULT_IP, "zk_port": "2181"}],
"zk_account": "zk_account",
"zk_password": "zk_password",
Expand Down
86 changes: 86 additions & 0 deletions apps/node_man/management/commands/transform_ap_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# coding: utf-8
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import typing

from django.core.management.base import BaseCommand, CommandError

from apps.node_man import models
from apps.node_man.utils.endpoint import EndPointTransform
from common.log import logger


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-e",
"--transform",
required=False,
help="AP_ID create from V1 AP_ID",
default=False,
action="store_true",
)
parser.add_argument(
"-l",
"--transform_endpoint_to_legacy",
action="store_true",
default=False,
help="Clean up the original mapping ID",
)
parser.add_argument(
"-a",
"--all_ap",
action="store_true",
default=False,
help="Transform all the AP_IDs in the database",
)
parser.add_argument(
"-t",
"--transform_ap_id",
required=False,
help="Transform target AP_ID in the database",
)

def handle(self, **options):
transform_endpoint_to_legacy = options.get("transform_endpoint_to_legacy")
transform = options.get("transform")
if not transform_endpoint_to_legacy and not transform:
raise CommandError("Please specify the AP_ID to be transformed")
if transform and transform_endpoint_to_legacy:
raise CommandError("Please specify only one AP_ID to be transformed")

all_ap_transform = options.get("all_ap")
transform_ap_id = options.get("transform_ap_id")
if all_ap_transform and transform_ap_id:
raise CommandError("Please specify only one AP_ID to be transformed")
if not all_ap_transform and not transform_ap_id:
raise CommandError("Please specify the AP_ID to be transformed")

if all_ap_transform:
ap_objects: typing.List[models.AccessPoint] = models.AccessPoint.objects.all()
else:
ap_objects: typing.List[models.AccessPoint] = models.AccessPoint.objects.filter(id=transform_ap_id)

if transform_endpoint_to_legacy:
transform_func: typing.Callable = EndPointTransform().transform_endpoint_to_legacy
elif transform:
transform_func: typing.Callable = EndPointTransform().transform
else:
raise CommandError("Please specify the transformation method")

for ap_object in ap_objects:
logger.info(f"Transforming AP_ID: {ap_object.id}")
try:
ap_object.taskserver = transform_func(ap_object.taskserver)
ap_object.dataserver = transform_func(ap_object.dataserver)
ap_object.btfileserver = transform_func(ap_object.btfileserver)
ap_object.save()
except Exception as e:
raise CommandError(f"Failed to transform AP_ID: {ap_object.id}, error: {e}")
26 changes: 26 additions & 0 deletions apps/node_man/migrations/0081_covert_ap_data_20231109_1336.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 3.2.4 on 2023-10-29 05:36

from django.db import migrations, models

from apps.node_man.utils.endpoint import EndPointTransform


def covert_ap_data(apps, schema_editor):
AccessPoint = apps.get_model("node_man", "AccessPoint")
aps = AccessPoint.objects.all()
for ap in aps:
# 转换 gse 地址,从一对一关系,转换为两个列表
ap.btfileserver = EndPointTransform().transform(legacy_endpoints=ap.btfileserver)
ap.dataserver = EndPointTransform().transform(legacy_endpoints=ap.dataserver)
ap.taskserver = EndPointTransform().transform(legacy_endpoints=ap.taskserver)
ap.save()


class Migration(migrations.Migration):
dependencies = [
("node_man", "0080_auto_20231122_1552"),
]

operations = [
migrations.RunPython(covert_ap_data),
]
54 changes: 37 additions & 17 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,21 @@ class AccessPoint(models.Model):

@property
def file_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.btfileserver, outer_server_infos=self.btfileserver)
return EndpointInfo(
inner_ip_infos=self.btfileserver["inner_ip_infos"], outer_ip_infos=self.btfileserver["outer_ip_infos"]
)

@property
def data_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.dataserver, outer_server_infos=self.dataserver)
return EndpointInfo(
inner_ip_infos=self.dataserver["inner_ip_infos"], outer_ip_infos=self.dataserver["outer_ip_infos"]
)

@property
def cluster_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.taskserver, outer_server_infos=self.taskserver)
return EndpointInfo(
inner_ip_infos=self.taskserver["inner_ip_infos"], outer_ip_infos=self.taskserver["outer_ip_infos"]
)

@classmethod
def ap_id_obj_map(cls):
Expand Down Expand Up @@ -622,12 +628,18 @@ def test(cls, params: dict):
接入点可用性测试
:param params: Dict
{
"servers": [
{
"inner_ip": "127.0.0.1",
"outer_ip": "127.0.0.2"
}
],
"btfileserver": {
"inner_ip_infos": [{"ip": "127.0.0.1"}],
"outer_ip_infos": [{"ip": "127.0.0.2"}]
},
"taskserver": {
"inner_ip_infos": [{"ip": "127.0.0.1"}],
"outer_ip_infos": [{"ip": "127.0.0.2"}]
},
"dataserver": {
"inner_ip_infos": [{"ip": "127.0.0.1"}],
"outer_ip_infos": [{"ip": "127.0.0.2"}]
},
"package_inner_url": "http://127.0.0.1/download/",
"package_outer_url": "http://127.0.0.2/download/"
}
Expand Down Expand Up @@ -692,9 +704,10 @@ def _check_callback_url(url: str, _logs: list):

test_logs = []
detect_hosts: Set[str] = set()
for server in params.get("btfileserver", []) + params.get("dataserver", []) + params.get("taskserver", []):
detect_hosts.add(server.get("inner_ip") or server.get("inner_ipv6"))

for server_type in ["btfileserver", "dataserver", "taskserver"]:
for ip_info in params[server_type]["inner_ip_infos"]:
detect_hosts.add(ip_info.get("ip"))
with ThreadPoolExecutor(max_workers=settings.CONCURRENT_NUMBER) as ex:
tasks = [ex.submit(_check_ip, detect_host, test_logs) for detect_host in detect_hosts]
tasks.append(ex.submit(_check_package_url, params["package_inner_url"], test_logs))
Expand Down Expand Up @@ -883,7 +896,10 @@ class GsePluginDesc(models.Model):
scenario_en = models.TextField(_("英文使用场景"), null=True, blank=True)
category = models.CharField(_("所属范围"), max_length=32, choices=constants.CATEGORY_CHOICES)
launch_node = models.CharField(
_("宿主节点类型要求"), max_length=32, choices=[("agent", "agent"), ("proxy", "proxy"), ("all", "all")], default="all"
_("宿主节点类型要求"),
max_length=32,
choices=[("agent", "agent"), ("proxy", "proxy"), ("all", "all")],
default="all",
)

config_file = models.CharField(_("配置文件名称"), max_length=128, null=True, blank=True)
Expand Down Expand Up @@ -1446,12 +1462,10 @@ class DownloadRecord(models.Model):

@property
def is_finish(self):

return self.task_status == self.TASK_STATUS_FAILED or self.task_status == self.TASK_STATUS_SUCCESS

@property
def is_failed(self):

return self.task_status == self.TASK_STATUS_FAILED

@property
Expand Down Expand Up @@ -1941,7 +1955,7 @@ def get_host_id__bk_obj_sub_map(
host_id__bk_obj_sub_map[proc_status["bk_host_id"]].append(
{
"bk_obj_id": proc_status["bk_obj_id"],
"subscription": exist_subscription_id__obj_map.get(int(proc_status["source_id"]))
"subscription": exist_subscription_id__obj_map.get(int(proc_status["source_id"])),
# "subscription_id": int(proc_status.source_id),
# "name": exist_subscription_id__obj_map.get(int(proc_status.source_id)),
}
Expand Down Expand Up @@ -2197,7 +2211,10 @@ class SubscriptionInstanceRecord(models.Model):
is_latest = models.BooleanField(_("是否为实例最新记录"), default=True, db_index=True)

status = models.CharField(
_("任务状态"), max_length=45, choices=constants.JobStatusType.get_choices(), default=constants.JobStatusType.PENDING
_("任务状态"),
max_length=45,
choices=constants.JobStatusType.get_choices(),
default=constants.JobStatusType.PENDING,
)

@property
Expand Down Expand Up @@ -2346,7 +2363,10 @@ class SubscriptionInstanceStatusDetail(models.Model):
subscription_instance_record_id = models.BigIntegerField(_("订阅实例ID"), db_index=True)
node_id = models.CharField(_("Pipeline原子ID"), max_length=50, default="", blank=True, db_index=True)
status = models.CharField(
_("任务状态"), max_length=45, choices=constants.JobStatusType.get_choices(), default=constants.JobStatusType.RUNNING
_("任务状态"),
max_length=45,
choices=constants.JobStatusType.get_choices(),
default=constants.JobStatusType.RUNNING,
)
log = models.TextField(_("日志内容"))
update_time = models.DateTimeField(_("更新时间"), null=True, blank=True, db_index=True)
Expand Down
19 changes: 8 additions & 11 deletions apps/node_man/periodic_tasks/gse_svr_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
specific language governing permissions and limitations under the License.
"""
from telnetlib import Telnet
from typing import Any, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

from celery.task import periodic_task
from django.conf import settings
Expand All @@ -21,7 +21,6 @@


def check_ip_ports_reachable(host: str, ports: List[int]) -> bool:

for port in ports:
try:
with Telnet(host=host, port=port, timeout=2):
Expand All @@ -33,7 +32,6 @@ def check_ip_ports_reachable(host: str, ports: List[int]) -> bool:


class ZkSafeClient:

zk_client: Optional[KazooClient]

def __init__(self, hosts: str, auth_data: List[Tuple[str, str]], **kwargs):
Expand Down Expand Up @@ -110,16 +108,15 @@ def gse_svr_discovery_periodic_task():
continue
logger.info(f"zk_node_path -> {zk_node_path}, svr_ips -> {svr_ips}")

inner_ip__outer_ip_map: Dict[str, str] = {}
for svr_info in getattr(ap, ap_field, []):
inner_ip__outer_ip_map[svr_info.get("inner_ip")] = svr_info.get("outer_ip")
outer_ips = getattr(ap, ap_field, {}).get("outer_ip_infos") or {}
inner_ip_infos: List[Dict[str, str]] = [{"ip": inner_ip} for inner_ip in svr_ips]

svr_infos: List[Dict[str, Any]] = []
for svr_ip in svr_ips:
# svr_ip 通常解析为内网IP,外网IP允许自定义,如果为空再取 svr_ip
outer_ip = inner_ip__outer_ip_map.get(svr_ip) or svr_ip
svr_infos.append({"inner_ip": svr_ip, "outer_ip": outer_ip})
svr_infos: Dict[str, List[Dict[str, str]]] = {
"inner_ip_infos": inner_ip_infos,
"outer_ip_infos": outer_ips if outer_ips else inner_ip_infos,
}
setattr(ap, ap_field, svr_infos)
logger.info(f"update ap -> {ap}, {ap_field} -> {svr_infos}")
is_change = True
if is_change:
ap.save()
Loading

0 comments on commit 7f36781

Please sign in to comment.