Skip to content

Commit

Permalink
fix(backend): 单据标准化协议修改 #7747
Browse files Browse the repository at this point in the history
  • Loading branch information
iSecloud committed Nov 29, 2024
1 parent 55ba7eb commit dcfef61
Show file tree
Hide file tree
Showing 40 changed files with 326 additions and 339 deletions.
3 changes: 3 additions & 0 deletions dbm-ui/backend/components/dbresource/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ def __init__(self):
self.resource_summary = self.generate_data_api(
method="POST", url="/statistic/summary", description=_("按照条件聚合资源统计")
)
self.resource_label_count = self.generate_data_api(
method="POST", url="/groupby/label/count", description=_("按照标签统计资源数量")
)


DBResourceApi = _DBResourceApi()
182 changes: 7 additions & 175 deletions dbm-ui/backend/db_dirty/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,18 @@
"""
import itertools
import logging
from collections import defaultdict
from typing import Any, Dict, List
from typing import List

from django.utils.translation import ugettext as _

from backend import env
from backend.components import CCApi
from backend.configuration.constants import SystemSettingsEnum
from backend.configuration.models import SystemSettings
from backend.db_dirty.constants import MachineEventType, PoolType
from backend.db_dirty.exceptions import PoolTransferException
from backend.db_dirty.models import DirtyMachine, MachineEvent
from backend.db_meta.models import AppCache
from backend.db_services.ipchooser.constants import IDLE_HOST_MODULE
from backend.db_services.ipchooser.handlers.topo_handler import TopoHandler
from backend.db_services.ipchooser.query.resource import ResourceQueryHelper
from backend.flow.consts import FAILED_STATES
from backend.flow.utils.cc_manage import CcManage
from backend.ticket.builders import BuilderFactory
from backend.ticket.builders.common.base import fetch_apply_hosts
from backend.ticket.models import Flow, Ticket
from backend.utils.batch_request import request_multi_thread

logger = logging.getLogger("root")

Expand Down Expand Up @@ -67,172 +57,14 @@ def transfer_hosts_to_pool(cls, operator: str, bk_host_ids: List[int], source: P
raise PoolTransferException(_("{}--->{}转移不合法").format(source, target))

@classmethod
def query_dirty_machine_records(cls, bk_host_ids: List[int]):
"""
查询污点池主机信息 TODO: 污点池废弃,代码将被移除
@param bk_host_ids: 主机列表
"""

def get_module_data(data):
params, res = data
params = params["params"]
return [{"bk_biz_id": params["bk_biz_id"], **d} for d in res]

if not bk_host_ids:
return []

# 如果传入的列表已经是DirtyMachine,则直接用
if not isinstance(bk_host_ids[0], DirtyMachine):
dirty_machines = DirtyMachine.objects.filter(bk_host_id__in=bk_host_ids)
else:
dirty_machines = bk_host_ids
bk_host_ids = [dirty.bk_host_id for dirty in dirty_machines]

# 缓存云区域和业务信息
bk_biz_ids = [dirty_machine.bk_biz_id for dirty_machine in dirty_machines]
for_biz_infos = AppCache.batch_get_app_attr(bk_biz_ids=bk_biz_ids, attr_name="bk_biz_name")
cloud_info = ResourceQueryHelper.search_cc_cloud(get_cache=True)

# 查询污点主机当前所处的模块
host_topo_infos = CCApi.find_host_biz_relations(params={"bk_host_id": bk_host_ids})
host__topo_info_map: Dict[int, List] = defaultdict(list)
biz__modules_map: Dict[int, List] = defaultdict(list)
for topo in host_topo_infos:
host__topo_info_map[topo["bk_host_id"]].append(topo)
biz__modules_map[topo["bk_biz_id"]].append(topo["bk_module_id"])
# 批量获取业务下模块信息
module_infos = request_multi_thread(
func=CCApi.find_module_batch,
params_list=[
{
"params": {"bk_biz_id": biz, "bk_ids": modules, "fields": ["bk_module_id", "bk_module_name"]},
"use_admin": True,
}
for biz, modules in biz__modules_map.items()
],
get_data=get_module_data,
in_order=True,
)
module_infos = list(itertools.chain(*module_infos))
biz__module__module_name: Dict[int, Dict[int, str]] = defaultdict(dict)
for info in module_infos:
biz__module__module_name[info["bk_biz_id"]][info["bk_module_id"]] = info["bk_module_name"]

# 获取污点池模块
system_manage_topo = SystemSettings.get_setting_value(key=SystemSettingsEnum.MANAGE_TOPO.value)
dirty_module = system_manage_topo["dirty_module_id"]

# 获取污点池列表信息
dirty_machine_list: List[Dict] = []
for dirty in dirty_machines:
# 填充污点池主机基础信息
dirty_machine_info = {
"ip": dirty.ip,
"bk_host_id": dirty.bk_host_id,
"bk_cloud_name": cloud_info[str(dirty.bk_cloud_id)]["bk_cloud_name"],
"bk_cloud_id": dirty.bk_cloud_id,
"bk_biz_name": for_biz_infos[int(dirty.bk_biz_id)],
"bk_biz_id": dirty.bk_biz_id,
"ticket_type": dirty.ticket.ticket_type,
"ticket_id": dirty.ticket.id,
"ticket_type_display": dirty.ticket.get_ticket_type_display(),
"task_id": dirty.flow.flow_obj_id,
"operator": dirty.ticket.creator,
"is_dirty": True,
}

# 如果主机已经不存在于cc,则仅能删除记录
if dirty.bk_host_id not in host__topo_info_map:
dirty_machine_info.update(is_dirty=False)
dirty_machine_list.append(dirty_machine_info)
continue

# 补充主机所在的模块信息
host_in_module = [
{
"bk_module_id": h["bk_module_id"],
"bk_module_name": biz__module__module_name[h["bk_biz_id"]].get(h["bk_module_id"], ""),
}
for h in host__topo_info_map[dirty.bk_host_id]
]
dirty_machine_info.update(bk_module_infos=host_in_module)

# 如果主机 不处于/不仅仅处于【污点池】中,则不允许移入待回收
host = host__topo_info_map[dirty.bk_host_id][0]
if len(host__topo_info_map[dirty.bk_host_id]) > 1:
dirty_machine_info.update(is_dirty=False)
elif host["bk_biz_id"] != env.DBA_APP_BK_BIZ_ID or host["bk_module_id"] != dirty_module:
dirty_machine_info.update(is_dirty=False)

dirty_machine_list.append(dirty_machine_info)

dirty_machine_list.sort(key=lambda x: x["ticket_id"], reverse=True)
return dirty_machine_list

@classmethod
def insert_dirty_machines(cls, bk_biz_id: int, bk_host_ids: List[Dict[str, Any]], ticket: Ticket, flow: Flow):
def handle_dirty_machine(cls, ticket_id, root_id, origin_tree_status, target_tree_status):
"""
将机器导入到污点池中 TODO: 污点池废弃,代码将被移除
@param bk_biz_id: 业务ID
@param bk_host_ids: 主机列表
@param ticket: 关联的单据
@param flow: 关联的flow任务
处理执行失败/重试成功涉及的污点池机器
@param ticket_id: 单据ID
@param root_id: 流程ID
@param origin_tree_status: 流程源状态
@param target_tree_status: 流程目标状态
"""
# 查询污点机器信息
host_property_filter = {
"condition": "AND",
"rules": [{"field": "bk_host_id", "operator": "in", "value": bk_host_ids}],
}
dirty_host_infos = CCApi.list_hosts_without_biz(
{
# 默认一次性录入的机器不会超过500
"page": {"start": 0, "limit": 500, "sort": "bk_host_id"},
"host_property_filter": host_property_filter,
"fields": ["bk_host_id", "bk_cloud_id", "bk_host_innerip"],
},
use_admin=True,
)["info"]

# 获取业务空闲机模块,资源池模块和污点池模块
idle_module = CcManage(bk_biz_id, "").get_biz_internal_module(bk_biz_id)[IDLE_HOST_MODULE]["bk_module_id"]
system_manage_topo = SystemSettings.get_setting_value(key=SystemSettingsEnum.MANAGE_TOPO.value)
resource_module, dirty_module = system_manage_topo["resource_module_id"], system_manage_topo["dirty_module_id"]
# 获取主机的拓扑信息(注:这里不能带上业务信息,因为主机可能转移业务)
host_topo_infos = TopoHandler.query_host_set_module(bk_host_ids=bk_host_ids)["hosts_topo_info"]
# 将污点机器信息转移至DBA污点池模(如果污点机器不在空闲机/资源池,则放弃转移,认为已到正确拓扑)
transfer_host_ids = [
info["bk_host_id"]
for info in host_topo_infos
if not set(info["bk_module_ids"]) - {resource_module, idle_module}
]
if transfer_host_ids:
update_host_properties = {"dbm_meta": [], "need_monitor": False, "update_operator": False}
CcManage(bk_biz_id=env.DBA_APP_BK_BIZ_ID, cluster_type="").transfer_host_module(
transfer_host_ids, target_module_ids=[dirty_module], update_host_properties=update_host_properties
)

# 录入污点池表中
exist_dirty_machine_ids = list(
DirtyMachine.objects.filter(bk_host_id__in=bk_host_ids).values_list("bk_host_id", flat=True)
)
DirtyMachine.objects.bulk_create(
[
DirtyMachine(
ticket=ticket,
flow=flow,
ip=host["bk_host_innerip"],
bk_biz_id=bk_biz_id,
bk_host_id=host["bk_host_id"],
bk_cloud_id=host["bk_cloud_id"],
)
for host in dirty_host_infos
if host["bk_host_id"] not in exist_dirty_machine_ids
]
)

@classmethod
def handle_dirty_machine(cls, ticket_id, root_id, origin_tree_status, target_tree_status):
"""处理执行失败/重试成功涉及的污点池机器"""
if (origin_tree_status not in FAILED_STATES) and (target_tree_status not in FAILED_STATES):
return

Expand Down
21 changes: 12 additions & 9 deletions dbm-ui/backend/db_dirty/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ def hosts_pool_transfer(cls, bk_biz_id, hosts, pool, operator="", ticket=None):
host_ids = [host["bk_host_id"] for host in hosts]

# 主机转入污点/故障池,说明第一次被纳管到池
if pool in [PoolType.Fault, PoolType.Dirty]:
hosts_pool = [
cls(bk_biz_id=bk_biz_id, pool=pool, ticket=ticket, creator=operator, updater=operator, **host)
for host in hosts
]
cls.objects.bulk_create(hosts_pool)
# 待回收只会从故障池转移
elif pool == PoolType.Recycle:
cls.objects.filter(bk_host_id__in=host_ids).update(pool=pool, ticket=ticket)
# 待回收会从故障池、资源池转移
# 因此这里判断主机不存在就创建,否则更新
if pool in [PoolType.Fault, PoolType.Dirty, PoolType.Recycle]:
handle_hosts = cls.objects.filter(bk_host_id__in=host_ids)
if handle_hosts.count() == len(host_ids):
handle_hosts.update(pool=pool, ticket=ticket)
else:
handle_hosts = [
cls(bk_biz_id=bk_biz_id, pool=pool, ticket=ticket, creator=operator, updater=operator, **host)
for host in hosts
]
cls.objects.bulk_create(handle_hosts)
# 回收机器只能从待回收转移,删除池纳管记录
# 重导入回资源池,删除池纳管记录
elif pool in [PoolType.Recycled, PoolType.Resource]:
Expand Down
6 changes: 4 additions & 2 deletions dbm-ui/backend/db_meta/api/machine/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,16 @@ def clear_info_for_machine(machines: Optional[List]):

# 清理proxy相关信息
for p in proxys:
p.tendbclusterspiderext.delete()
if hasattr(p, "tendbclusterspiderext"):
p.tendbclusterspiderext.delete()
p.delete(keep_parents=True)

# 清理storage相关信息
for s in storages:
for info in StorageInstanceTuple.objects.filter(Q(ejector=s) | Q(receiver=s)):
# 先删除额外关联信息,否则会报ProtectedError 异常
info.tendbclusterstorageset.delete()
if hasattr(info, "tendbclusterstorageset"):
info.tendbclusterstorageset.delete()
info.delete()
s.delete(keep_parents=True)
machine.delete(keep_parents=True)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class Migration(migrations.Migration):

dependencies = [
("db_meta", "0043_auto_20241014_1042"),
("db_meta", "0044_deviceclass"),
("db_meta", "0043_auto_20241015_2128"),
]

Expand Down
15 changes: 10 additions & 5 deletions dbm-ui/backend/db_meta/models/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ def __str__(self):

def to_dict(self):
"""将集群所有字段转为字段"""
return {**model_to_dict(self), "cluster_type_name": str(ClusterType.get_choice_label(self.cluster_type))}
return {
**model_to_dict(self, exclude=["tags"]),
"cluster_type_name": str(ClusterType.get_choice_label(self.cluster_type)),
}

@property
def simple_desc(self):
Expand Down Expand Up @@ -402,16 +405,18 @@ def enable_dbha(self):
self.refresh_from_db()

@classmethod
def get_cluster_related_machines(cls, cluster_ids: List[int]) -> List:
def get_cluster_related_machines(cls, cluster_ids: List[int], role: str = None) -> List:
"""
通过集群id查询集群关联的所有主机信息,即实例所在的主机
"""
from backend.db_meta.models import Machine

clusters = Cluster.objects.filter(id__in=cluster_ids)
host_ids = set(clusters.values_list("storageinstance__machine__bk_host_id", flat=True)) | set(
clusters.values_list("proxyinstance__machine__bk_host_id", flat=True)
)
host_ids = set()
if not role or role == "backend":
host_ids |= set(clusters.values_list("storageinstance__machine__bk_host_id", flat=True))
if not role or role == "proxy":
host_ids |= set(clusters.values_list("proxyinstance__machine__bk_host_id", flat=True))
machines = Machine.objects.filter(bk_host_id__in=host_ids)
return machines

Expand Down
6 changes: 5 additions & 1 deletion dbm-ui/backend/db_package/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
SyncMediumSerializer,
UploadPackageSerializer,
)
from backend.exceptions import ApiRequestError
from backend.flow.consts import MediumEnum
from backend.iam_app.dataclass import ResourceEnum
from backend.iam_app.dataclass.actions import ActionEnum
Expand Down Expand Up @@ -168,7 +169,10 @@ def partial_update(self, request, *args, **kwargs):
)
def destroy(self, request, *args, **kwargs):
# 删除制品库文件
StorageHandler().delete_file(self.get_object().path)
try:
StorageHandler().delete_file(self.get_object().path)
except ApiRequestError as e:
logger.error(_("文件删除异常,错误信息: {}").format(e))
# 删除本地记录
super().destroy(request, *args, **kwargs)
return Response()
Expand Down
9 changes: 8 additions & 1 deletion dbm-ui/backend/db_services/dbresource/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from backend.db_meta.enums import InstanceRole
from backend.db_meta.enums.spec import SpecClusterType, SpecMachineType
from backend.db_meta.models import Spec
from backend.db_meta.models.machine import DeviceClass
from backend.db_meta.models.machine import DeviceClass, Machine
from backend.db_services.dbresource import mock
from backend.db_services.dbresource.constants import ResourceGroupByEnum, ResourceOperation
from backend.db_services.dbresource.mock import (
Expand Down Expand Up @@ -47,6 +47,13 @@ class HostInfoSerializer(serializers.Serializer):
hosts = serializers.ListSerializer(help_text=_("主机"), child=HostInfoSerializer())
labels = serializers.ListField(help_text=_("标签"), child=serializers.CharField(), required=False)

def validate(self, attrs):
# 如果主机存在源数据,则拒绝导入
host_ids = [host["host_id"] for host in attrs["hosts"]]
exist_hosts = list(Machine.objects.filter(bk_host_id__in=host_ids).values_list("ip", flat=True))
if exist_hosts:
raise serializers.ValidationError(_("导入主机{}存在元数据,请检查后重新导入").format(exist_hosts))


class ResourceApplySerializer(serializers.Serializer):
class HostDetailSerializer(serializers.Serializer):
Expand Down
16 changes: 12 additions & 4 deletions dbm-ui/backend/db_services/mysql/fixpoint_rollback/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def query_fixpoint_log(self, requests, *args, **kwargs):
limit, offset = validated_data["limit"], validated_data["offset"]

# 查询目前定点回档临时集群
tmp_clusters = Cluster.objects.filter(cluster_type=ClusterType.TenDBCluster, tag__name=SystemTagEnum.TEMPORARY)
tmp_clusters = Cluster.objects.filter(cluster_type=ClusterType.TenDBCluster, tags__key=SystemTagEnum.TEMPORARY)
tmp_clusters_count = tmp_clusters.count()
# 查询定点回档记录
temp_clusters = tmp_clusters[offset : limit + offset]
Expand All @@ -120,18 +120,26 @@ def query_fixpoint_log(self, requests, *args, **kwargs):
fixpoint_logs: List[Dict[str, Any]] = []
for record in records:
ticket_data = record.ticket.details["infos"][0]
clusters = record.ticket.details["clusters"]
target_cluster = Cluster.objects.get(id=record.cluster_id)
# 获取回档的节点信息
if "rollback_host" in ticket_data:
rollback_host = ticket_data["rollback_host"]
else:
rollback_host = {
"remote_hosts": ticket_data["resource_spec"]["remote_hosts"]["hosts"],
"spider_host": ticket_data["resource_spec"]["spider_host"]["hosts"][0],
}
# 填充回档记录
fixpoint_logs.append(
{
"databases": ticket_data["databases"],
"databases_ignore": ticket_data["databases_ignore"],
"tables": ticket_data["tables"],
"tables_ignore": ticket_data["tables_ignore"],
"source_cluster": clusters[str(ticket_data["cluster_id"])],
"source_cluster": record.ticket.details["clusters"][str(ticket_data["cluster_id"])],
"target_cluster": {
"cluster_id": record.cluster_id,
"nodes": ticket_data["rollback_host"],
"nodes": rollback_host,
"status": target_cluster.status,
"phase": target_cluster.phase,
"operations": ClusterOperateRecord.objects.get_cluster_operations(record.cluster_id),
Expand Down
Loading

0 comments on commit dcfef61

Please sign in to comment.