Skip to content

Commit

Permalink
feature: 接入点改造 (closed TencentBlueKing#852)
Browse files Browse the repository at this point in the history
  • Loading branch information
CohleRustW committed Dec 9, 2023
1 parent 9319bd4 commit f3d5515
Show file tree
Hide file tree
Showing 11 changed files with 891 additions and 84 deletions.
15 changes: 12 additions & 3 deletions apps/mock_data/common_unit/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@
"ap_type": "system",
"region_id": "test",
"city_id": "test",
"btfileserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"dataserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"taskserver": [{"inner_ip": DEFAULT_IP, "outer_ip": DEFAULT_IP}],
"btfileserver": {
"inner_ips": [{"inner_ip": DEFAULT_IP}],
"outer_ips": [{"outer_ip": DEFAULT_IP}],
},
"dataserver": {
"inner_ips": [{"inner_ip": DEFAULT_IP}],
"outer_ips": [{"outer_ip": DEFAULT_IP}],
},
"taskserver": {
"inner_ips": [{"inner_ip": DEFAULT_IP}],
"outer_ips": [{"outer_ip": DEFAULT_IP}],
},
"zk_hosts": [{"zk_ip": DEFAULT_IP, "zk_port": "2181"}],
"zk_account": "zk_account",
"zk_password": "zk_password",
Expand Down
79 changes: 79 additions & 0 deletions apps/node_man/management/commands/transform_ap_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# coding: utf-8
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import typing

from django.core.management.base import BaseCommand

from apps.node_man import models
from apps.node_man.utils.endpoint import EndPointTransform
from common.log import logger


class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-e", "--transform", type=bool, required=False, help="AP_ID create from V1 AP_ID", default=False
)
parser.add_argument(
"-l",
"--transform_endpoint_to_leagcy",
action="store_true",
default=False,
help="Clean up the original mapping ID",
)
parser.add_argument(
"-a",
"--all_ap",
action="store_true",
default=False,
help="Transform all the AP_IDs in the database",
)
parser.add_argument(
"-t",
"--transform_ap_id",
type=int,
required=False,
help="Transform target AP_ID in the database",
)

def handle(self, **options):
transform_endpoint_to_leagcy = options.get("transform_endpoint_to_leagcy")
transform = options.get("transform")
if not transform_endpoint_to_leagcy and not transform:
raise ValueError("Please specify the AP_ID to be transformed")
if transform and transform_endpoint_to_leagcy:
raise ValueError("Please specify only one AP_ID to be transformed")

all_ap_transform = options.get("all_ap")
transform_ap_id = options.get("transform_ap_id")
if all_ap_transform and transform_ap_id:
raise ValueError("Please specify only one AP_ID to be transformed")
if not all_ap_transform and not transform_ap_id:
raise ValueError("Please specify the AP_ID to be transformed")

if all_ap_transform:
ap_objects: typing.List[models.AccessPoint] = models.AccessPoint.objects.all()
else:
ap_objects: typing.List[models.AccessPoint] = models.AccessPoint.objects.filter(id=transform_ap_id)

if transform_endpoint_to_leagcy:
transform_func: typing.Callable = EndPointTransform().transform_endpoint_to_leagcy
elif transform:
transform_func: typing.Callable = EndPointTransform().transform
else:
raise ValueError("Please specify the transformation method")

for ap_object in ap_objects:
logger.info(f"Transforming AP_ID: {ap_object.id}")
ap_object.taskserver = transform_func(ap_object.taskserver)
ap_object.dataserver = transform_func(ap_object.dataserver)
ap_object.btfileserver = transform_func(ap_object.btfileserver)
ap_object.save()
26 changes: 26 additions & 0 deletions apps/node_man/migrations/0081_covert_ap_data_20231109_1336.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 3.2.4 on 2023-10-29 05:36

from django.db import migrations, models

from apps.node_man.utils.endpoint import EndPointTransform


def covert_ap_data(apps, schema_editor):
AccessPoint = apps.get_model("node_man", "AccessPoint")
aps = AccessPoint.objects.all()
for ap in aps:
# 转换 gse 地址,从一对一关系,转换为两个列表
ap.btfileserver = EndPointTransform().transform(legacy_endpoints=ap.btfileserver)
ap.dataserver = EndPointTransform().transform(legacy_endpoints=ap.dataserver)
ap.taskserver = EndPointTransform().transform(legacy_endpoints=ap.taskserver)
ap.save()


class Migration(migrations.Migration):
dependencies = [
("node_man", "0080_auto_20231122_1552"),
]

operations = [
migrations.RunPython(covert_ap_data),
]
29 changes: 20 additions & 9 deletions apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ class Meta:


class AccessPoint(models.Model):
class ServersType(object):
OUTER_IPS = "outer_ips"
INNER_IPS = "inner_ips"

name = models.CharField(_("接入点名称"), max_length=255)
ap_type = models.CharField(_("接入点类型"), max_length=255, default="user")
region_id = models.CharField(_("区域id"), max_length=255, default="", blank=True, null=True)
Expand Down Expand Up @@ -548,15 +552,15 @@ class AccessPoint(models.Model):

@property
def file_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.btfileserver, outer_server_infos=self.btfileserver)
return EndpointInfo(endpoints=self.btfileserver)

@property
def data_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.dataserver, outer_server_infos=self.dataserver)
return EndpointInfo(endpoints=self.dataserver)

@property
def cluster_endpoint_info(self) -> EndpointInfo:
return EndpointInfo(inner_server_infos=self.taskserver, outer_server_infos=self.taskserver)
return EndpointInfo(endpoints=self.taskserver)

@classmethod
def ap_id_obj_map(cls):
Expand Down Expand Up @@ -881,7 +885,10 @@ class GsePluginDesc(models.Model):
scenario_en = models.TextField(_("英文使用场景"), null=True, blank=True)
category = models.CharField(_("所属范围"), max_length=32, choices=constants.CATEGORY_CHOICES)
launch_node = models.CharField(
_("宿主节点类型要求"), max_length=32, choices=[("agent", "agent"), ("proxy", "proxy"), ("all", "all")], default="all"
_("宿主节点类型要求"),
max_length=32,
choices=[("agent", "agent"), ("proxy", "proxy"), ("all", "all")],
default="all",
)

config_file = models.CharField(_("配置文件名称"), max_length=128, null=True, blank=True)
Expand Down Expand Up @@ -1444,12 +1451,10 @@ class DownloadRecord(models.Model):

@property
def is_finish(self):

return self.task_status == self.TASK_STATUS_FAILED or self.task_status == self.TASK_STATUS_SUCCESS

@property
def is_failed(self):

return self.task_status == self.TASK_STATUS_FAILED

@property
Expand Down Expand Up @@ -1939,7 +1944,7 @@ def get_host_id__bk_obj_sub_map(
host_id__bk_obj_sub_map[proc_status["bk_host_id"]].append(
{
"bk_obj_id": proc_status["bk_obj_id"],
"subscription": exist_subscription_id__obj_map.get(int(proc_status["source_id"]))
"subscription": exist_subscription_id__obj_map.get(int(proc_status["source_id"])),
# "subscription_id": int(proc_status.source_id),
# "name": exist_subscription_id__obj_map.get(int(proc_status.source_id)),
}
Expand Down Expand Up @@ -2195,7 +2200,10 @@ class SubscriptionInstanceRecord(models.Model):
is_latest = models.BooleanField(_("是否为实例最新记录"), default=True, db_index=True)

status = models.CharField(
_("任务状态"), max_length=45, choices=constants.JobStatusType.get_choices(), default=constants.JobStatusType.PENDING
_("任务状态"),
max_length=45,
choices=constants.JobStatusType.get_choices(),
default=constants.JobStatusType.PENDING,
)

@property
Expand Down Expand Up @@ -2344,7 +2352,10 @@ class SubscriptionInstanceStatusDetail(models.Model):
subscription_instance_record_id = models.BigIntegerField(_("订阅实例ID"), db_index=True)
node_id = models.CharField(_("Pipeline原子ID"), max_length=50, default="", blank=True, db_index=True)
status = models.CharField(
_("任务状态"), max_length=45, choices=constants.JobStatusType.get_choices(), default=constants.JobStatusType.RUNNING
_("任务状态"),
max_length=45,
choices=constants.JobStatusType.get_choices(),
default=constants.JobStatusType.RUNNING,
)
log = models.TextField(_("日志内容"))
update_time = models.DateTimeField(_("更新时间"), null=True, blank=True, db_index=True)
Expand Down
14 changes: 3 additions & 11 deletions apps/node_man/periodic_tasks/gse_svr_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
specific language governing permissions and limitations under the License.
"""
from telnetlib import Telnet
from typing import Any, Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple

from celery.task import periodic_task
from django.conf import settings
Expand All @@ -21,7 +21,6 @@


def check_ip_ports_reachable(host: str, ports: List[int]) -> bool:

for port in ports:
try:
with Telnet(host=host, port=port, timeout=2):
Expand All @@ -33,7 +32,6 @@ def check_ip_ports_reachable(host: str, ports: List[int]) -> bool:


class ZkSafeClient:

zk_client: Optional[KazooClient]

def __init__(self, hosts: str, auth_data: List[Tuple[str, str]], **kwargs):
Expand Down Expand Up @@ -110,15 +108,9 @@ def gse_svr_discovery_periodic_task():
continue
logger.info(f"zk_node_path -> {zk_node_path}, svr_ips -> {svr_ips}")

inner_ip__outer_ip_map: Dict[str, str] = {}
for svr_info in getattr(ap, ap_field, []):
inner_ip__outer_ip_map[svr_info.get("inner_ip")] = svr_info.get("outer_ip")
outer_ips = getattr(ap, ap_field, []).get("outer_ips" or [])

svr_infos: List[Dict[str, Any]] = []
for svr_ip in svr_ips:
# svr_ip 通常解析为内网IP,外网IP允许自定义,如果为空再取 svr_ip
outer_ip = inner_ip__outer_ip_map.get(svr_ip) or svr_ip
svr_infos.append({"inner_ip": svr_ip, "outer_ip": outer_ip})
svr_infos = {"inner_ips": [{"inner_ip": inner_ip} for inner_ip in svr_ips], "outer_ips": outer_ips}
setattr(ap, ap_field, svr_infos)
is_change = True
if is_change:
Expand Down
68 changes: 53 additions & 15 deletions apps/node_man/serializers/ap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from typing import List
import typing

from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
Expand Down Expand Up @@ -84,28 +84,66 @@ class UpdateOrCreateSerializer(serializers.ModelSerializer):
"""

class ServersSerializer(serializers.Serializer):
inner_ip = serializers.CharField(label=_("内网IP"), required=False)
inner_ipv6 = serializers.CharField(label=_("内网IPv6"), required=False)
outer_ip = serializers.CharField(label=_("外网IP"), required=False)
outer_ipv6 = serializers.CharField(label=_("外网IPv6"), required=False)
bk_host_id = serializers.IntegerField(label=_("主机ID"), required=False)
class ServerInnerSerializer(serializers.Serializer):
inner_ip = serializers.CharField(label=_("内网IP"), required=False)
inner_ipv6 = serializers.CharField(label=_("内网IPv6"), required=False)
bk_host_id = serializers.IntegerField(label=_("主机ID"), required=False)

def validate(self, attrs):
inner_ip = attrs.get("inner_ip")
inner_ipv6 = attrs.get("inner_ipv6")
if not attrs.get("inner_ip") and not attrs.get("inner_ipv6") and not attrs.get("bk_host_id"):
raise ValidationError(_("请求参数必须包含 inner_ip inner_ipv6 or bk_host_id"))
if inner_ip and not basic.is_v4(inner_ip):
raise ValidationError(_("请求参数 inner_ip 不是有效的IP地址"))
if inner_ipv6 and not basic.is_v6(inner_ipv6):
raise ValidationError(_("请求参数 inner_ipv6 不是有效的IP 地址"))
return attrs

class ServerOuterSerializer(serializers.Serializer):
outer_ip = serializers.CharField(label=_("外网IP"), required=False)
outer_ipv6 = serializers.CharField(label=_("外网IPv6"), required=False)
bk_host_id = serializers.IntegerField(label=_("主机ID"), required=False)

def validate(self, attrs):
outer_ip = attrs.get("outer_ip")
outer_ipv6 = attrs.get("outer_ipv6")
if not attrs.get("outer_ip") and not attrs.get("outer_ipv6") and not attrs.get("bk_host_id"):
raise ValidationError(_("请求参数必须包含 outer_ip outer_ipv6 or bk_host_id"))
if outer_ip and not basic.is_v4(outer_ip):
raise ValidationError(_("请求参数 outer_ip 不是有效的IP地址"))
if outer_ipv6 and not basic.is_v6(outer_ipv6):
raise ValidationError(_("请求参数 outer_ipv6 不是有效的IP 地址"))
return attrs

inner_ips = serializers.ListField(label=_("内网IP信息"), required=False, child=ServerInnerSerializer())
outer_ips = serializers.ListField(label=_("外网IP信息"), required=False, child=ServerOuterSerializer())

def validate(self, attrs):
basic.ipv6_formatter(data=attrs, ipv6_field_names=["inner_ipv6", "outer_ipv6"])
if not attrs.get("inner_ips") and not attrs.get("outer_ips"):
raise ValidationError(_("请求参数 inner_ips, outer_ips 不可同时为空"))

for attr in attrs.keys():
attrs[attr] = list({frozenset(d.items()): d for d in attrs[attr]}.values())

# 检查是否同时包含 ipv4 和 ipv6
for attr in attrs.keys():
v4_attr = attr.replace("ips", "ip")
v6_attr = attr.replace("ips", "ipv6")
v4_items = [item for item in attrs[attr] if item.get(v4_attr)]
v6_items = [item for item in attrs[attr] if item.get(v6_attr)]
if v4_items and v6_items:
raise ValidationError(_(f"{attr} 中不能同时包括 {v4_attr}{v6_attr}"))

if not (attrs.get("inner_ip") or attrs.get("inner_ipv6")):
raise ValidationError(_("请求参数 inner_ip 和 inner_ipv6 不能同时为空"))
if not (attrs.get("outer_ip") or attrs.get("outer_ipv6")):
raise ValidationError(_("请求参数 outer_ip 和 outer_ipv6 不能同时为空"))
return attrs

class ZKSerializer(serializers.Serializer):
zk_ip = serializers.CharField(label=_("ZK IP地址"))
zk_port = serializers.CharField(label=_("ZK 端口"))

btfileserver = serializers.ListField(child=ServersSerializer())
dataserver = serializers.ListField(child=ServersSerializer())
taskserver = serializers.ListField(child=ServersSerializer())
btfileserver = ServersSerializer()
dataserver = ServersSerializer()
taskserver = ServersSerializer()
zk_hosts = serializers.ListField(child=ZKSerializer())
zk_account = serializers.CharField(label=_("ZK账号"), required=False, allow_blank=True)
zk_password = serializers.CharField(label=_("ZK密码"), required=False, allow_blank=True)
Expand All @@ -119,7 +157,7 @@ class ZKSerializer(serializers.Serializer):
callback_url = serializers.CharField(label=_("节点管理内网回调地址"), required=False, allow_blank=True)

def validate(self, data):
gse_version_list: List[str] = list(set(AccessPoint.objects.values_list("gse_version", flat=True)))
gse_version_list: typing.List[str] = list(set(AccessPoint.objects.values_list("gse_version", flat=True)))
# 存量接入点版本全部为V2新建/更新版本也为V2版本
if GseVersion.V1.value not in gse_version_list:
data["gse_version"] = GseVersion.V2.value
Expand Down
13 changes: 8 additions & 5 deletions apps/node_man/tests/test_pericdic_tasks/mock_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@
"/gse/config/server/task/all": ["127.0.0.1", "127.0.0.2", "127.0.0.3"],
"/gse/config/server/btfiles/all": ["127.0.0.1", "127.0.0.2", "127.0.0.3"],
}
MOCK_AP_FIELD_MAP = [
{"inner_ip": "127.0.0.1", "outer_ip": "127.0.0.1"},
{"inner_ip": "127.0.0.2", "outer_ip": "127.0.0.2"},
{"inner_ip": "127.0.0.3", "outer_ip": "127.0.0.3"},
]
MOCK_AP_FIELD_MAP = {
"inner_ips": [
{"inner_ip": "127.0.0.1"},
{"inner_ip": "127.0.0.2"},
{"inner_ip": "127.0.0.3"},
],
"outer_ips": [{"outer_ip": ""}],
}
Loading

0 comments on commit f3d5515

Please sign in to comment.