diff --git a/dbm-ui/backend/db_meta/api/cluster/doris/__init__.py b/dbm-ui/backend/db_meta/api/cluster/doris/__init__.py index 929563c1bf..336548b2a3 100644 --- a/dbm-ui/backend/db_meta/api/cluster/doris/__init__.py +++ b/dbm-ui/backend/db_meta/api/cluster/doris/__init__.py @@ -13,5 +13,4 @@ from .disable import disable from .enable import enable from .scale_up import scale_up - -# from .shrink import shrink +from .shrink import shrink diff --git a/dbm-ui/backend/db_meta/api/cluster/doris/destroy.py b/dbm-ui/backend/db_meta/api/cluster/doris/destroy.py index 0f917aa5ce..62d34c2468 100644 --- a/dbm-ui/backend/db_meta/api/cluster/doris/destroy.py +++ b/dbm-ui/backend/db_meta/api/cluster/doris/destroy.py @@ -26,7 +26,7 @@ def destroy(cluster_id: int): """ cluster = Cluster.objects.get(id=cluster_id) - cc_manage = CcManage(bk_biz_id=cluster.bk_biz_id, db_type=DBType.Doris.value) + cc_manage = CcManage(bk_biz_id=cluster.bk_biz_id, cluster_type=cluster.cluster_type) # 删除storage instance for storage in cluster.storageinstance_set.all(): diff --git a/dbm-ui/backend/db_meta/api/cluster/doris/shrink.py b/dbm-ui/backend/db_meta/api/cluster/doris/shrink.py new file mode 100644 index 0000000000..005dba7fa2 --- /dev/null +++ b/dbm-ui/backend/db_meta/api/cluster/doris/shrink.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging +from typing import List, Optional + +from django.db import transaction + +from backend.db_meta import request_validator +from backend.db_meta.api import common +from backend.db_meta.enums import InstanceRole +from backend.db_meta.models import Cluster, ClusterEntry, StorageInstance +from backend.flow.utils.cc_manage import CcManage + +logger = logging.getLogger("root") + + +@transaction.atomic +def shrink( + cluster_id: int, + storages: Optional[List] = None, +): + """ + 缩容清理DBMeta + """ + + cluster = Cluster.objects.get(id=cluster_id) + cluster_entry = ClusterEntry.objects.get(cluster=cluster) + + storages = request_validator.validated_storage_list(storages, allow_empty=False, allow_null=False) + storage_objs = common.filter_out_instance_obj(storages, StorageInstance.objects.all()) + for storage in storage_objs: + storage.delete(keep_parents=True) + if not storage.machine.storageinstance_set.exists(): + # 将机器挪到 待回收 模块 + CcManage(storage.bk_biz_id, cluster.cluster_type).recycle_host([storage.machine.bk_host_id]) + storage.machine.delete(keep_parents=True) + + cluster.storageinstance_set.remove(*storage_objs) + cluster.save() + cluster_entry.storageinstance_set.remove(*storage_objs) + cluster_entry.save() + + # 当集群入口(域名)找不到实例,需要更新 + if not cluster_entry.storageinstance_set.exists(): + observers = cluster.storageinstance_set.filter(instance_role=InstanceRole.DORIS_OBSERVER) + followers = cluster.storageinstance_set.filter(instance_role=InstanceRole.DORIS_FOLLOWER) + + if observers.exists(): + for observer in observers: + cluster_entry.storageinstance_set.add(observer) + elif followers.exists(): + for follower in followers: + cluster_entry.storageinstance_set.add(follower) diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index 620d92d35f..a6b875aeb8 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -598,6 +598,7 @@ class DorisActuatorActionEnum(str, StructuredEnum): RestartProcess = EnumField("restart_process", _("restart_process")) CleanData = EnumField("clean_data", _("clean_data")) UpdateMetadata = EnumField("update_metadata", _("update_metadata")) + CheckDecommission = EnumField("check_decommission", "check_decommission") class RiakModuleId(int, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_apply_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_apply_flow.py index c8dbcd5e54..fd175e4a8c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_apply_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_apply_flow.py @@ -18,7 +18,17 @@ from backend.flow.consts import DnsOpType, DorisRoleEnum, ManagerOpType, ManagerServiceType from backend.flow.engine.bamboo.scene.common.builder import Builder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList -from backend.flow.engine.bamboo.scene.doris.doris_base_flow import DorisBaseFlow, get_all_node_ips_in_ticket +from backend.flow.engine.bamboo.scene.doris.doris_base_flow import ( + DorisBaseFlow, + get_all_node_ips_in_ticket, + get_node_ips_in_ticket_by_role, + make_meta_host_map, +) +from backend.flow.engine.bamboo.scene.doris.exceptions import ( + BeMachineCountException, + RoleMachineCountException, + RoleMachineCountMustException, +) from backend.flow.plugins.components.collections.common.bigdata_manager_service import BigdataManagerComponent from backend.flow.plugins.components.collections.doris.doris_db_meta import DorisMetaComponent from backend.flow.plugins.components.collections.doris.doris_dns_manage import DorisDnsManageComponent @@ -29,6 +39,11 @@ from backend.flow.plugins.components.collections.doris.get_doris_resource import GetDorisResourceComponent from backend.flow.plugins.components.collections.doris.rewrite_doris_config import WriteBackDorisConfigComponent from backend.flow.plugins.components.collections.doris.trans_files import TransFileComponent +from backend.flow.utils.doris.consts import ( + DORIS_BACKEND_NOT_COUNT, + DORIS_FOLLOWER_MUST_COUNT, + DORIS_OBSERVER_NOT_COUNT, +) from backend.flow.utils.doris.doris_act_payload import DorisActPayload from backend.flow.utils.doris.doris_context_dataclass import DnsKwargs, DorisActKwargs, DorisApplyContext from backend.flow.utils.extension_manage import BigdataManagerKwargs @@ -56,7 +71,7 @@ def __get_flow_data(self) -> dict: master_ip = self.nodes[DorisRoleEnum.FOLLOWER][0]["ip"] flow_data["master_fe_ip"] = master_ip - host_map = self.make_meta_host_map(flow_data) + host_map = make_meta_host_map(flow_data) flow_data["host_meta_map"] = host_map return flow_data @@ -68,6 +83,9 @@ def deploy_doris_flow(self): """ doris_deploy_data = self.__get_flow_data() + # 检查单据传参 + self.check_apply_role_ip_count(doris_deploy_data) + doris_pipeline = Builder(root_id=self.root_id, data=doris_deploy_data) trans_files = GetFileList(db_type=DBType.Doris) @@ -115,7 +133,7 @@ def deploy_doris_flow(self): kwargs=asdict(act_kwargs), ) - act_kwargs.get_doris_payload_func = DorisActPayload.get_add_nodes_metadata_payload.__name__ + act_kwargs.get_doris_payload_func = DorisActPayload.get_add_metadata_payload.__name__ doris_pipeline.add_act( act_name=_("集群元数据更新"), act_component_code=ExecuteDorisActuatorScriptComponent.code, @@ -125,7 +143,7 @@ def deploy_doris_flow(self): sub_new_fe_pipelines = self.new_fe_sub_flows(act_kwargs=act_kwargs, data=doris_deploy_data) doris_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_new_fe_pipelines) # 扩容BE节点子流程 - sub_new_be_acts = self.new_bew_sub_acts(act_kwargs=act_kwargs, data=doris_deploy_data) + sub_new_be_acts = self.new_be_sub_acts(act_kwargs=act_kwargs, data=doris_deploy_data) doris_pipeline.add_parallel_acts(acts_list=sub_new_be_acts) # 插入Doris WebUI实例信息 @@ -167,3 +185,26 @@ def deploy_doris_flow(self): ) doris_pipeline.run_pipeline() + + @staticmethod + def check_apply_role_ip_count(data: dict): + # 检查 follower 数量 + follower_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.FOLLOWER)) + if follower_count != DORIS_FOLLOWER_MUST_COUNT: + logger.error(_("DorisFollower主机数不为{},当前选择数量为{}".format(DORIS_FOLLOWER_MUST_COUNT, follower_count))) + raise RoleMachineCountMustException( + doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT + ) + + # 检查 observer 数量 + observer_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.OBSERVER)) + if observer_count == DORIS_OBSERVER_NOT_COUNT: + logger.error(_("DorisObserver主机数不能为{}".format(DORIS_OBSERVER_NOT_COUNT))) + raise RoleMachineCountException(doris_role=DorisRoleEnum.OBSERVER, machine_count=DORIS_OBSERVER_NOT_COUNT) + + # 检查数据节点的数量(hot + cold > 1) + hot_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.HOT)) + cold_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.COLD)) + if hot_count + cold_count == DORIS_BACKEND_NOT_COUNT: + logger.error(_("Doris数据节点(hot+cold)数量不能为{}".format(DORIS_BACKEND_NOT_COUNT))) + raise BeMachineCountException(must_count=DORIS_BACKEND_NOT_COUNT) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_base_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_base_flow.py index dbf920a32a..5e371b812f 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_base_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_base_flow.py @@ -8,6 +8,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import copy import logging.config from dataclasses import asdict from typing import Dict, Optional @@ -23,6 +24,7 @@ from backend.flow.plugins.components.collections.doris.exec_doris_actuator_script import ( ExecuteDorisActuatorScriptComponent, ) +from backend.flow.plugins.components.collections.doris.get_doris_payload import GetDorisActPayloadComponent from backend.flow.utils.base.payload_handler import PayloadHandler from backend.flow.utils.doris.consts import DorisConfigEnum from backend.flow.utils.doris.doris_act_payload import DorisActPayload @@ -32,6 +34,33 @@ logger = logging.getLogger("flow") +def make_fe_map_from_ticket(data: dict) -> dict: + host_map = {} + for role in data["nodes"]: + if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]: + ips = [node["ip"] for node in data["nodes"][role]] + host_map[role] = ips + return host_map + + +def make_be_map_from_ticket(data: dict) -> dict: + host_map = {} + for role in data["nodes"]: + if role in [DorisRoleEnum.HOT.value, DorisRoleEnum.COLD.value]: + ips = [node["ip"] for node in data["nodes"][role]] + host_map[role] = ips + return host_map + + +def make_meta_host_map(data: dict) -> dict: + host_map = {} + for role in data["nodes"]: + ips = [node["ip"] for node in data["nodes"][role]] + host_map[role] = ips + + return host_map + + class DorisBaseFlow(object): """ Doris Flow基类 @@ -109,14 +138,13 @@ def __init__(self, root_id: str, data: Optional[Dict]): self.doris_config = dbconfig["content"] self.be_conf = self.doris_config[DorisConfigEnum.Backend] self.fe_conf = self.doris_config[DorisConfigEnum.Frontend] - self.http_port = self.doris_config[DorisConfigEnum.Frontend]["http_port"] - self.query_port = self.doris_config[DorisConfigEnum.Frontend]["query_port"] + # dbconfig 默认返回字符串类型,需要转int + self.http_port = int(self.doris_config[DorisConfigEnum.Frontend]["http_port"]) + self.query_port = int(self.doris_config[DorisConfigEnum.Frontend]["query_port"]) auth_info = PayloadHandler.get_bigdata_auth_by_cluster(cluster, 0) self.username = auth_info["username"] self.password = auth_info["password"] - self.username = "username" - self.password = "password" self.master_ips = [master.machine.ip for master in masters] def get_flow_base_data(self) -> dict: @@ -144,19 +172,16 @@ def get_flow_base_data(self) -> dict: def __get_flow_data(self) -> dict: pass - def make_meta_host_map(self, data: dict) -> dict: - host_map = {} - for role in self.nodes: - ips = [node["ip"] for node in self.nodes[role]] - host_map[role] = ips - - return host_map - def get_all_node_ips_in_dbmeta(self) -> list: cluster = Cluster.objects.get(id=self.cluster_id) storage_ips = list(set(StorageInstance.objects.filter(cluster=cluster).values_list("machine__ip", flat=True))) return storage_ips + def get_role_ips_in_dbmeta(self, role: InstanceRole) -> list: + cluster = Cluster.objects.get(id=self.cluster_id) + role_ips = list(cluster.storageinstance_set.filter(instance_role=role).values_list("machine__ip", flat=True)) + return role_ips + def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list: # """ # 新增节点common操作sub_flow 数组 @@ -168,7 +193,7 @@ def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list: # 操作 # """ sub_pipelines = [] - for role, role_nodes in self.nodes.items(): + for role, role_nodes in data["nodes"].items(): for node in role_nodes: sub_pipeline = SubBuilder(root_id=self.root_id, data=data) ip = node["ip"] @@ -210,7 +235,7 @@ def new_common_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list: # 新加入frontend(follower/observer)节点子流程(不包括元数据更新) def new_fe_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list: sub_pipelines = [] - for role, role_nodes in self.nodes.items(): + for role, role_nodes in data["nodes"].items(): if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]: for fe_node in role_nodes: fe_ip = fe_node["ip"] @@ -241,9 +266,10 @@ def new_fe_sub_flows(self, act_kwargs: DorisActKwargs, data: dict) -> list: return sub_pipelines # 新加入backend(hot/cold)节点子流程(不包括元数据更新) - def new_bew_sub_acts(self, act_kwargs: DorisActKwargs, data: dict) -> list: + @staticmethod + def new_be_sub_acts(act_kwargs: DorisActKwargs, data: dict) -> list: be_acts = [] - for role, role_nodes in self.nodes.items(): + for role, role_nodes in data["nodes"].items(): if role in [DorisRoleEnum.COLD.value, DorisRoleEnum.HOT.value]: for be_node in role_nodes: act_kwargs.exec_ip = be_node["ip"] @@ -257,6 +283,92 @@ def new_bew_sub_acts(self, act_kwargs: DorisActKwargs, data: dict) -> list: be_acts.append(be_act) return be_acts + def build_del_fe_sub_flow(self, data: dict) -> SubBuilder: + # sub_flow 缩容FE 只涉及Doris集群操作,不包括清理数据目录/dbmeta等 + del_fe_data = copy.deepcopy(data) + del_fe_data["host_meta_map"] = make_fe_map_from_ticket(del_fe_data) + del_fe_sub_pipeline = SubBuilder(root_id=self.root_id, data=del_fe_data) + act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id) + + del_fe_sub_pipeline.add_act( + act_name=_("获取集群Payload"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(act_kwargs) + ) + + stop_fe_acts = [] + for role, role_nodes in data["nodes"].items(): + if role in [DorisRoleEnum.FOLLOWER.value, DorisRoleEnum.OBSERVER.value]: + for fe_node in role_nodes: + act_kwargs.exec_ip = fe_node["ip"] + act_kwargs.doris_role = role + act_kwargs.get_doris_payload_func = DorisActPayload.get_stop_process_payload.__name__ + fe_act = { + "act_name": _("停止DorisFE-{}-{}").format(role, fe_node["ip"]), + "act_component_code": ExecuteDorisActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + stop_fe_acts.append(fe_act) + + del_fe_sub_pipeline.add_parallel_acts(stop_fe_acts) + # 更新元数据 删除 FE + act_kwargs.exec_ip = del_fe_data["master_fe_ip"] + act_kwargs.get_doris_payload_func = DorisActPayload.get_drop_metadata_payload.__name__ + del_fe_sub_pipeline.add_act( + act_name=_("集群元数据更新-drop-fe"), + act_component_code=ExecuteDorisActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + return del_fe_sub_pipeline + + def build_del_be_sub_flow(self, data: dict) -> SubBuilder: + del_be_data = copy.deepcopy(data) + del_be_data["host_meta_map"] = make_be_map_from_ticket(del_be_data) + # sub_flow 缩容BE 只涉及Doris集群操作,不包括清理数据目录/dbmeta等 + del_be_sub_pipeline = SubBuilder(root_id=self.root_id, data=del_be_data) + act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id) + + del_be_sub_pipeline.add_act( + act_name=_("获取集群Payload"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(act_kwargs) + ) + # 更新元数据 退役 BE + act_kwargs.exec_ip = del_be_data["master_fe_ip"] + act_kwargs.get_doris_payload_func = DorisActPayload.get_decommission_metadata_payload.__name__ + del_be_sub_pipeline.add_act( + act_name=_("集群元数据更新-退役-BE"), + act_component_code=ExecuteDorisActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + # 等待数据搬迁 + act_kwargs.get_doris_payload_func = DorisActPayload.get_check_decommission_payload.__name__ + del_be_sub_pipeline.add_act( + act_name=_("检查数据节点是否退役"), + act_component_code=ExecuteDorisActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + # 更新元数据 删除 BE + act_kwargs.get_doris_payload_func = DorisActPayload.get_force_drop_metadata_payload.__name__ + del_be_sub_pipeline.add_act( + act_name=_("集群元数据更新-删除-BE"), + act_component_code=ExecuteDorisActuatorScriptComponent.code, + kwargs=asdict(act_kwargs), + ) + + stop_be_acts = [] + for role, role_nodes in data["nodes"].items(): + if role in [DorisRoleEnum.HOT.value, DorisRoleEnum.COLD.value]: + for be_node in role_nodes: + act_kwargs.exec_ip = be_node["ip"] + act_kwargs.doris_role = role + act_kwargs.get_doris_payload_func = DorisActPayload.get_stop_process_payload.__name__ + be_act = { + "act_name": _("停止DorisBE-{}-{}").format(role, be_node["ip"]), + "act_component_code": ExecuteDorisActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + stop_be_acts.append(be_act) + + del_be_sub_pipeline.add_parallel_acts(stop_be_acts) + return del_be_sub_pipeline + def get_node_ips_in_ticket_by_role(data: dict, role: str) -> list: if role not in data.get("nodes"): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_replace_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_replace_flow.py new file mode 100644 index 0000000000..e5d8f93cb7 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_replace_flow.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.db_meta.enums import InstanceRole +from backend.flow.consts import DnsOpType, DorisRoleEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.doris.doris_base_flow import ( + DorisBaseFlow, + be_exists_in_ticket, + fe_exists_in_ticket, + get_all_node_ips_in_ticket, + make_meta_host_map, +) +from backend.flow.engine.bamboo.scene.doris.exceptions import ( + ReplaceMachineCountException, + RoleMachineCountMustException, +) +from backend.flow.plugins.components.collections.doris.doris_db_meta import DorisMetaComponent +from backend.flow.plugins.components.collections.doris.doris_dns_manage import DorisDnsManageComponent +from backend.flow.plugins.components.collections.doris.exec_doris_actuator_script import ( + ExecuteDorisActuatorScriptComponent, +) +from backend.flow.plugins.components.collections.doris.get_doris_payload import GetDorisActPayloadComponent +from backend.flow.plugins.components.collections.doris.get_doris_resource import GetDorisResourceComponent +from backend.flow.plugins.components.collections.es.trans_files import TransFileComponent +from backend.flow.utils.doris.consts import DORIS_FOLLOWER_MUST_COUNT +from backend.flow.utils.doris.doris_act_payload import DorisActPayload +from backend.flow.utils.doris.doris_context_dataclass import DnsKwargs, DorisActKwargs, DorisApplyContext +from backend.ticket.constants import TicketType + +logger = logging.getLogger("flow") + + +class DorisReplaceFlow(DorisBaseFlow): + """ + Doris替换流程 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + :param root_id: 任务流程定义的root_id + :param data: 单据传递过来的参数列表,字典格式 + """ + super().__init__(root_id, data) + self.new_nodes = data["new_nodes"] + self.old_nodes = data["old_nodes"] + + def __get_flow_data(self) -> dict: + flow_data = self.get_flow_base_data() + flow_data["new_nodes"] = self.new_nodes + flow_data["old_nodes"] = self.old_nodes + return flow_data + + def __get_scale_up_flow_data(self) -> dict: + flow_data = self.get_flow_base_data() + flow_data["nodes"] = self.new_nodes + flow_data["ticket_type"] = TicketType.DORIS_SCALE_UP.value + follower_ips = self.get_role_ips_in_dbmeta(InstanceRole.DORIS_FOLLOWER) + # 增加follower 数量 判断(不判断严格等于,兼容替换场景) + if len(follower_ips) < DORIS_FOLLOWER_MUST_COUNT: + logger.error("get follower ips from dbmeta, count is {}, invalid".format(len(follower_ips))) + raise RoleMachineCountMustException( + doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT + ) + + # 随机选取follower 作为添加元数据的操作IP + flow_data["master_fe_ip"] = follower_ips[0] + host_map = make_meta_host_map(flow_data) + flow_data["host_meta_map"] = host_map + return flow_data + + def __get_shrink_flow_data(self) -> dict: + flow_data = self.get_flow_base_data() + flow_data["nodes"] = self.old_nodes + flow_data["ticket_type"] = TicketType.DORIS_SHRINK.value + # 选取 master_fe_ip 与其他流程不同,若有follower角色替换,直接获取新的follower ip作为master_fe_ip + if DorisRoleEnum.FOLLOWER in self.new_nodes and self.new_nodes[DorisRoleEnum.FOLLOWER]: + new_follower_ips = [node["ip"] for node in self.new_nodes[DorisRoleEnum.FOLLOWER]] + flow_data["master_fe_ip"] = new_follower_ips[0] + else: + follower_ips = self.get_role_ips_in_dbmeta(InstanceRole.DORIS_FOLLOWER) + # 增加follower 数量 判断(不判断严格等于,兼容替换场景) + if len(follower_ips) < DORIS_FOLLOWER_MUST_COUNT: + logger.error("get follower ips from dbmeta, count is {}, invalid".format(len(follower_ips))) + raise RoleMachineCountMustException( + doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT + ) + # 随机选取follower 作为添加元数据的操作IP + flow_data["master_fe_ip"] = follower_ips[0] + + return flow_data + + def replace_doris_flow(self): + """ + 定义替换Doris集群 + :return: + """ + replace_data = self.__get_flow_data() + # 检查 替换表单角色机器数量是否合法 + self.check_replace_role_ip_count(replace_data) + + doris_pipeline = Builder(root_id=self.root_id, data=replace_data) + # 扩容子流程 + scale_up_data = self.__get_scale_up_flow_data() + scale_up_sub_pipeline = SubBuilder(root_id=self.root_id, data=scale_up_data) + + trans_files = GetFileList(db_type=DBType.Doris) + # 扩容流程使用的act_kwargs + new_act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id) + new_act_kwargs.set_trans_data_dataclass = DorisApplyContext.__name__ + new_act_kwargs.file_list = trans_files.doris_apply(self.db_version) + + scale_up_sub_pipeline.add_act( + act_name=_("获取Payload"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(new_act_kwargs) + ) + + # 获取机器资源 + scale_up_sub_pipeline.add_act( + act_name=_("获取机器信息"), act_component_code=GetDorisResourceComponent.code, kwargs=asdict(new_act_kwargs) + ) + """ + new_act_kwargs.exec_ip = get_all_node_ips_in_ticket(data=scale_up_data) + scale_up_sub_pipeline.add_act( + act_name=_("下发DORIS介质"), act_component_code=TransFileComponent.code, kwargs=asdict(new_act_kwargs) + ) + + # 新节点统一初始化流程 + sub_common_pipelines = self.new_common_sub_flows(act_kwargs=new_act_kwargs, data=scale_up_data) + # 并发执行所有子流程 + scale_up_sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_common_pipelines) + + new_act_kwargs.exec_ip = scale_up_data["master_fe_ip"] + new_act_kwargs.get_doris_payload_func = DorisActPayload.get_add_nodes_metadata_payload.__name__ + scale_up_sub_pipeline.add_act( + act_name=_("集群元数据更新"), + act_component_code=ExecuteDorisActuatorScriptComponent.code, + kwargs=asdict(new_act_kwargs), + ) + """ + + # 扩容FE节点子流程 + if fe_exists_in_ticket(data=scale_up_data): + sub_new_fe_pipelines = self.new_fe_sub_flows(act_kwargs=new_act_kwargs, data=scale_up_data) + scale_up_sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_new_fe_pipelines) + + # 扩容BE节点子流程 + if be_exists_in_ticket(data=scale_up_data): + sub_new_be_acts = self.new_be_sub_acts(act_kwargs=new_act_kwargs, data=scale_up_data) + scale_up_sub_pipeline.add_parallel_acts(acts_list=sub_new_be_acts) + + # 添加到DBMeta并转模块 + scale_up_sub_pipeline.add_act( + act_name=_("添加到DBMeta"), act_component_code=DorisMetaComponent.code, kwargs=asdict(new_act_kwargs) + ) + + # 添加域名 + dns_kwargs = DnsKwargs( + bk_cloud_id=scale_up_data["bk_cloud_id"], + dns_op_type=DnsOpType.UPDATE, + domain_name=self.domain, + dns_op_exec_port=self.http_port, + ) + scale_up_sub_pipeline.add_act( + act_name=_("添加域名"), + act_component_code=DorisDnsManageComponent.code, + kwargs={**asdict(new_act_kwargs), **asdict(dns_kwargs)}, + ) + + doris_pipeline.add_sub_pipeline(sub_flow=scale_up_sub_pipeline.build_sub_process(sub_name=_("扩容子流程"))) + + # 缩容子流程 + shrink_data = self.__get_shrink_flow_data() + shrink_sub_pipeline = SubBuilder(root_id=self.root_id, data=shrink_data) + + shrink_act_kwargs = DorisActKwargs(bk_cloud_id=shrink_data["bk_cloud_id"]) + shrink_act_kwargs.set_trans_data_dataclass = DorisApplyContext.__name__ + shrink_act_kwargs.file_list = trans_files.doris_actuator() + shrink_sub_pipeline.add_act( + act_name=_("获取缩容流程集群部署配置"), + act_component_code=GetDorisActPayloadComponent.code, + kwargs=asdict(shrink_act_kwargs), + ) + # 更新dbactor介质包 + shrink_act_kwargs.exec_ip = get_all_node_ips_in_ticket(data=shrink_data) + shrink_sub_pipeline.add_act( + act_name=_("下发DORIS介质"), act_component_code=TransFileComponent.code, kwargs=asdict(shrink_act_kwargs) + ) + + # 缩容FE节点子流程 + if fe_exists_in_ticket(data=shrink_data): + del_fe_pipeline = self.build_del_fe_sub_flow(data=shrink_data) + shrink_sub_pipeline.add_sub_pipeline(del_fe_pipeline.build_sub_process(sub_name=_("缩容管理节点FE"))) + + # 缩容BE节点子流程 + if be_exists_in_ticket(data=shrink_data): + del_be_pipeline = self.build_del_be_sub_flow(data=shrink_data) + shrink_sub_pipeline.add_sub_pipeline(del_be_pipeline.build_sub_process(sub_name=_("缩容数据节点BE"))) + + shrink_ips = get_all_node_ips_in_ticket(shrink_data) + shrink_ip_acts = [] + for ip in shrink_ips: + # 节点清理 + shrink_act_kwargs.get_doris_payload_func = DorisActPayload.get_clean_data_payload.__name__ + shrink_act_kwargs.exec_ip = ip + act = { + "act_name": _("Doris集群节点清理-{}").format(ip), + "act_component_code": ExecuteDorisActuatorScriptComponent.code, + "kwargs": asdict(shrink_act_kwargs), + } + shrink_ip_acts.append(act) + + shrink_sub_pipeline.add_parallel_acts(acts_list=shrink_ip_acts) + # 添加到DBMeta并转模块 + shrink_sub_pipeline.add_act( + act_name=_("更新DBMeta"), act_component_code=DorisMetaComponent.code, kwargs=asdict(shrink_act_kwargs) + ) + + # 更新域名 + dns_kwargs = DnsKwargs( + bk_cloud_id=replace_data["bk_cloud_id"], + dns_op_type=DnsOpType.UPDATE, + domain_name=self.domain, + dns_op_exec_port=self.http_port, + ) + shrink_sub_pipeline.add_act( + act_name=_("更新域名"), + act_component_code=DorisDnsManageComponent.code, + kwargs={**asdict(shrink_act_kwargs), **asdict(dns_kwargs)}, + ) + doris_pipeline.add_sub_pipeline(sub_flow=shrink_sub_pipeline.build_sub_process(sub_name=_("缩容子流程"))) + + doris_pipeline.run_pipeline() + + def check_replace_role_ip_count(self, data: dict): + old_role_nodes = {} + new_role_nodes = {} + # 构建替换表单的角色IP数量 不检查被替换IP是否在集群内 + for role in DorisRoleEnum: + if role not in data.get("old_nodes"): + old_ips = [] + else: + old_ips = [node["ip"] for node in data["old_nodes"][role]] + old_role_nodes[role] = old_ips + + if role not in data.get("new_nodes"): + new_ips = [] + else: + new_ips = [node["ip"] for node in data["new_nodes"][role]] + new_role_nodes[role] = new_ips + + for role, ips in old_role_nodes.items(): + old_ips_cnt = len(ips) + new_ips_cnt = len(new_role_nodes[role]) + if old_ips_cnt != new_ips_cnt: + logger.error("ticket_type: %s, role is %s, machine count mismatch.", self.ticket_type, role) + raise ReplaceMachineCountException(doris_role=role, old_count=old_ips_cnt, new_count=new_ips_cnt) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_scale_up_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_scale_up_flow.py index f943013ada..bf1f66487c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_scale_up_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_scale_up_flow.py @@ -15,7 +15,8 @@ from django.utils.translation import ugettext as _ from backend.configuration.constants import DBType -from backend.flow.consts import DnsOpType +from backend.db_meta.enums import InstanceRole +from backend.flow.consts import DnsOpType, DorisRoleEnum from backend.flow.engine.bamboo.scene.common.builder import Builder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.doris.doris_base_flow import ( @@ -23,6 +24,13 @@ be_exists_in_ticket, fe_exists_in_ticket, get_all_node_ips_in_ticket, + get_node_ips_in_ticket_by_role, + make_meta_host_map, +) +from backend.flow.engine.bamboo.scene.doris.exceptions import ( + FollowerScaleUpUnsupportedException, + RoleMachineCountException, + RoleMachineCountMustException, ) from backend.flow.plugins.components.collections.doris.doris_db_meta import DorisMetaComponent from backend.flow.plugins.components.collections.doris.doris_dns_manage import DorisDnsManageComponent @@ -32,6 +40,7 @@ from backend.flow.plugins.components.collections.doris.get_doris_payload import GetDorisActPayloadComponent from backend.flow.plugins.components.collections.doris.get_doris_resource import GetDorisResourceComponent from backend.flow.plugins.components.collections.es.trans_files import TransFileComponent +from backend.flow.utils.doris.consts import DORIS_FOLLOWER_MUST_COUNT, DORIS_OBSERVER_NOT_COUNT from backend.flow.utils.doris.doris_act_payload import DorisActPayload from backend.flow.utils.doris.doris_context_dataclass import DnsKwargs, DorisActKwargs, DorisApplyContext @@ -52,6 +61,19 @@ def __init__(self, root_id: str, data: Optional[Dict]): def __get_flow_data(self) -> dict: flow_data = self.get_flow_base_data() + flow_data["nodes"] = self.nodes + follower_ips = self.get_role_ips_in_dbmeta(InstanceRole.DORIS_FOLLOWER) + # 增加follower 数量 判断(不判断严格等于,兼容替换场景) + if len(follower_ips) < DORIS_FOLLOWER_MUST_COUNT: + logger.error("get follower ips from dbmeta, count is {}, invalid".format(len(follower_ips))) + raise RoleMachineCountMustException( + doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT + ) + + # 随机选取follower 作为添加元数据的操作IP + flow_data["master_fe_ip"] = follower_ips[0] + host_map = make_meta_host_map(flow_data) + flow_data["host_meta_map"] = host_map return flow_data def scale_up_doris_flow(self): @@ -60,6 +82,9 @@ def scale_up_doris_flow(self): :return: """ scale_up_data = self.__get_flow_data() + # 检查 扩容表单角色机器数量是否合法 + self.check_scale_up_role_ip_count(scale_up_data) + doris_pipeline = Builder(root_id=self.root_id, data=scale_up_data) trans_files = GetFileList(db_type=DBType.Doris) @@ -90,7 +115,8 @@ def scale_up_doris_flow(self): # 并发执行所有子流程 doris_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_common_pipelines) - act_kwargs.get_doris_payload_func = DorisActPayload.get_add_nodes_metadata_payload.__name__ + act_kwargs.exec_ip = scale_up_data["master_fe_ip"] + act_kwargs.get_doris_payload_func = DorisActPayload.get_add_metadata_payload.__name__ doris_pipeline.add_act( act_name=_("集群元数据更新"), act_component_code=ExecuteDorisActuatorScriptComponent.code, @@ -103,7 +129,7 @@ def scale_up_doris_flow(self): # 扩容BE节点子流程 if be_exists_in_ticket(data=scale_up_data): - sub_new_be_acts = self.new_bew_sub_acts(act_kwargs=act_kwargs, data=scale_up_data) + sub_new_be_acts = self.new_be_sub_acts(act_kwargs=act_kwargs, data=scale_up_data) doris_pipeline.add_parallel_acts(acts_list=sub_new_be_acts) # 添加到DBMeta并转模块 @@ -125,3 +151,19 @@ def scale_up_doris_flow(self): ) doris_pipeline.run_pipeline() + + def check_scale_up_role_ip_count(self, data: dict): + # 扩容无需检查数据节点数量 + + # 检查 follower 数量 + follower_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.FOLLOWER)) + if follower_count > 0: + logger.error(_("DorisFollower不支持扩容,当前选择扩容机器数量为{}".format(follower_count))) + raise FollowerScaleUpUnsupportedException(machine_count=follower_count) + + # 检查 observer 数量 + add_observer_cnt = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.OBSERVER)) + former_observer_cnt = len(self.get_role_ips_in_dbmeta(InstanceRole.DORIS_OBSERVER)) + if add_observer_cnt + former_observer_cnt == DORIS_OBSERVER_NOT_COUNT: + logger.error(_("DorisObserver主机数不能为{}".format(DORIS_OBSERVER_NOT_COUNT))) + raise RoleMachineCountException(doris_role=DorisRoleEnum.OBSERVER, machine_count=DORIS_OBSERVER_NOT_COUNT) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_shrink_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_shrink_flow.py new file mode 100644 index 0000000000..50cde34501 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/doris_shrink_flow.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.db_meta.enums import InstanceRole +from backend.flow.consts import DnsOpType, DorisRoleEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.doris.doris_base_flow import ( + DorisBaseFlow, + be_exists_in_ticket, + fe_exists_in_ticket, + get_all_node_ips_in_ticket, + get_node_ips_in_ticket_by_role, +) +from backend.flow.engine.bamboo.scene.doris.exceptions import ( + FollowerScaleUpUnsupportedException, + RoleMachineCountException, + RoleMachineCountMustException, +) +from backend.flow.plugins.components.collections.doris.doris_db_meta import DorisMetaComponent +from backend.flow.plugins.components.collections.doris.doris_dns_manage import DorisDnsManageComponent +from backend.flow.plugins.components.collections.doris.exec_doris_actuator_script import ( + ExecuteDorisActuatorScriptComponent, +) +from backend.flow.plugins.components.collections.doris.get_doris_payload import GetDorisActPayloadComponent +from backend.flow.plugins.components.collections.doris.get_doris_resource import GetDorisResourceComponent +from backend.flow.plugins.components.collections.es.trans_files import TransFileComponent +from backend.flow.utils.doris.consts import DORIS_FOLLOWER_MUST_COUNT, DORIS_OBSERVER_NOT_COUNT +from backend.flow.utils.doris.doris_act_payload import DorisActPayload +from backend.flow.utils.doris.doris_context_dataclass import DnsKwargs, DorisActKwargs, DorisApplyContext + +logger = logging.getLogger("flow") + + +class DorisShrinkFlow(DorisBaseFlow): + """ + Doris缩容流程 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + :param root_id: 任务流程定义的root_id + :param data: 单据传递过来的参数列表,字典格式 + """ + super().__init__(root_id, data) + + def __get_flow_data(self) -> dict: + flow_data = self.get_flow_base_data() + flow_data["nodes"] = self.nodes + follower_ips = self.get_role_ips_in_dbmeta(InstanceRole.DORIS_FOLLOWER) + # 增加follower 数量 判断(不判断严格等于,兼容替换场景) + if len(follower_ips) < DORIS_FOLLOWER_MUST_COUNT: + logger.error("get follower ips from dbmeta, count is {}, invalid".format(len(follower_ips))) + raise RoleMachineCountMustException( + doris_role=DorisRoleEnum.FOLLOWER, must_count=DORIS_FOLLOWER_MUST_COUNT + ) + + # 随机选取follower 作为添加元数据的操作IP + flow_data["master_fe_ip"] = follower_ips[0] + + return flow_data + + def shrink_doris_flow(self): + """ + 定义缩容Doris集群 + :return: + """ + shrink_data = self.__get_flow_data() + # 检查 缩容表单角色机器数量是否合法 + self.check_shrink_role_ip_count(shrink_data) + + doris_pipeline = Builder(root_id=self.root_id, data=shrink_data) + + trans_files = GetFileList(db_type=DBType.Doris) + + act_kwargs = DorisActKwargs(bk_cloud_id=self.bk_cloud_id) + act_kwargs.set_trans_data_dataclass = DorisApplyContext.__name__ + act_kwargs.file_list = trans_files.doris_actuator() + + doris_pipeline.add_act( + act_name=_("获取集群部署配置"), act_component_code=GetDorisActPayloadComponent.code, kwargs=asdict(act_kwargs) + ) + + # 获取机器资源 + doris_pipeline.add_act( + act_name=_("获取机器信息"), act_component_code=GetDorisResourceComponent.code, kwargs=asdict(act_kwargs) + ) + + # 更新dbactor介质包 + act_kwargs.exec_ip = get_all_node_ips_in_ticket(data=shrink_data) + doris_pipeline.add_act( + act_name=_("下发DORIS介质"), act_component_code=TransFileComponent.code, kwargs=asdict(act_kwargs) + ) + + # 更新域名 + dns_kwargs = DnsKwargs( + bk_cloud_id=shrink_data["bk_cloud_id"], + dns_op_type=DnsOpType.UPDATE, + domain_name=self.domain, + dns_op_exec_port=self.http_port, + ) + doris_pipeline.add_act( + act_name=_("更新域名"), + act_component_code=DorisDnsManageComponent.code, + kwargs={**asdict(act_kwargs), **asdict(dns_kwargs)}, + ) + + # 缩容FE节点子流程 + if fe_exists_in_ticket(data=shrink_data): + del_fe_pipeline = self.build_del_fe_sub_flow(data=shrink_data) + doris_pipeline.add_sub_pipeline(del_fe_pipeline.build_sub_process(sub_name=_("缩容管理节点FE"))) + + # 缩容BE节点子流程 + if be_exists_in_ticket(data=shrink_data): + del_be_pipeline = self.build_del_be_sub_flow(data=shrink_data) + doris_pipeline.add_sub_pipeline(del_be_pipeline.build_sub_process(sub_name=_("缩容数据节点BE"))) + + shrink_ips = get_all_node_ips_in_ticket(shrink_data) + shrink_ip_acts = [] + for ip in shrink_ips: + # 节点清理 + act_kwargs.get_doris_payload_func = DorisActPayload.get_clean_data_payload.__name__ + act_kwargs.exec_ip = ip + act = { + "act_name": _("Doris集群节点清理-{}").format(ip), + "act_component_code": ExecuteDorisActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + } + shrink_ip_acts.append(act) + + doris_pipeline.add_parallel_acts(acts_list=shrink_ip_acts) + # 添加到DBMeta并转模块 + doris_pipeline.add_act( + act_name=_("更新DBMeta"), act_component_code=DorisMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + doris_pipeline.run_pipeline() + + def check_shrink_role_ip_count(self, data: dict): + # 扩容无需检查数据节点数量 + + # 检查 follower 数量 + follower_count = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.FOLLOWER)) + if follower_count > 0: + logger.error(_("DorisFollower不支持缩容,当前选择缩容机器数量为{}".format(follower_count))) + raise FollowerScaleUpUnsupportedException(machine_count=follower_count) + + # 检查 observer 数量 + del_observer_cnt = len(get_node_ips_in_ticket_by_role(data, DorisRoleEnum.OBSERVER)) + former_observer_cnt = len(self.get_role_ips_in_dbmeta(InstanceRole.DORIS_OBSERVER)) + if former_observer_cnt - del_observer_cnt == DORIS_OBSERVER_NOT_COUNT: + logger.error(_("DorisObserver主机数不能为{}".format(DORIS_OBSERVER_NOT_COUNT))) + raise RoleMachineCountException(doris_role=DorisRoleEnum.OBSERVER, machine_count=DORIS_OBSERVER_NOT_COUNT) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/doris/exceptions.py b/dbm-ui/backend/flow/engine/bamboo/scene/doris/exceptions.py new file mode 100644 index 0000000000..29dfd8b9c7 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/doris/exceptions.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.utils.translation import ugettext as _ + +from backend.exceptions import AppBaseException, ErrorCode + + +class DorisFlowBaseException(AppBaseException): + MODULE_CODE = ErrorCode.FLOW_CODE + MESSAGE = _("Flow模块Doris异常") + + +class NormalDorisFlowException(DorisFlowBaseException): + ERROR_CODE = "001" + MESSAGE = _("通用异常") + MESSAGE_TPL = _("{message}") + + +class RoleMachineCountException(DorisFlowBaseException): + ERROR_CODE = "002" + MESSAGE = _("机器数量异常") + MESSAGE_TPL = _("Doris集群角色{doris_role}数量不能为{machine_count}") + + +class RoleMachineCountMustException(DorisFlowBaseException): + ERROR_CODE = "003" + MESSAGE = _("机器数量异常") + MESSAGE_TPL = _("Doris集群角色{doris_role}机器数量必须为{must_count}") + + +class BeMachineCountException(DorisFlowBaseException): + ERROR_CODE = "004" + MESSAGE = _("机器数量异常") + MESSAGE_TPL = _("Doris集群数据节点(热+冷)机器数量之和不能为{must_count}") + + +class FollowerScaleUpUnsupportedException(DorisFlowBaseException): + ERROR_CODE = "005" + MESSAGE = _("机器数量异常") + MESSAGE_TPL = _("Doris集群Follower不支持扩容, 当前选择扩容机器数量为{machine_count}") + + +class ReplaceMachineCountException(DorisFlowBaseException): + ERROR_CODE = "006" + MESSAGE = _("Doris替换机器数量不一致") + MESSAGE_TPL = _("Doris集群替换角色{doris_role}数量不一致,已选旧节点个数{old_count},新节点个数{new_count}") diff --git a/dbm-ui/backend/flow/engine/controller/doris.py b/dbm-ui/backend/flow/engine/controller/doris.py index cf7d60ed12..11199a34f6 100644 --- a/dbm-ui/backend/flow/engine/controller/doris.py +++ b/dbm-ui/backend/flow/engine/controller/doris.py @@ -15,7 +15,9 @@ from backend.flow.engine.bamboo.scene.doris.doris_disable_flow import DorisDisableFlow from backend.flow.engine.bamboo.scene.doris.doris_enable_flow import DorisEnableFlow from backend.flow.engine.bamboo.scene.doris.doris_reboot_flow import DorisRebootFlow +from backend.flow.engine.bamboo.scene.doris.doris_replace_flow import DorisReplaceFlow from backend.flow.engine.bamboo.scene.doris.doris_scale_up_flow import DorisScaleUpFlow +from backend.flow.engine.bamboo.scene.doris.doris_shrink_flow import DorisShrinkFlow from backend.flow.engine.controller.base import BaseController logger = logging.getLogger("Controller") @@ -61,20 +63,20 @@ def doris_destroy_scene(self): flow = DorisDestroyFlow(root_id=self.root_id, data=self.ticket_data) flow.destroy_doris_flow() - # def doris_shrink_scene(self): - # """ - # doris缩容 - # """ - # flow = DorisShrinkFlow(root_id=self.root_id, data=self.ticket_data) - # flow.shrink_doris_flow() - # - # def doris_replace_scene(self): - # """ - # doris替换 - # """ - # flow = DorisReplaceFlow(root_id=self.root_id, data=self.ticket_data) - # flow.replace_doris_flow() - # + def doris_shrink_scene(self): + """ + doris缩容 + """ + flow = DorisShrinkFlow(root_id=self.root_id, data=self.ticket_data) + flow.shrink_doris_flow() + + def doris_replace_scene(self): + """ + doris替换 + """ + flow = DorisReplaceFlow(root_id=self.root_id, data=self.ticket_data) + flow.replace_doris_flow() + def doris_reboot_scene(self): """ doris重启节点 diff --git a/dbm-ui/backend/flow/plugins/components/collections/doris/rewrite_doris_config.py b/dbm-ui/backend/flow/plugins/components/collections/doris/rewrite_doris_config.py index 49747dcd83..8706a60f18 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/doris/rewrite_doris_config.py +++ b/dbm-ui/backend/flow/plugins/components/collections/doris/rewrite_doris_config.py @@ -82,7 +82,7 @@ def write_auth_to_prv_manager(self, global_data: dict): "instances": [{"ip": global_data["domain"], "port": 0, "bk_cloud_id": global_data["bk_cloud_id"]}], "password": base64.b64encode(str(global_data["username"]).encode("utf-8")).decode("utf-8"), "username": MySQLPrivComponent.DORIS_FAKE_USER.value, - "component": NameSpaceEnum.Pulsar, + "component": NameSpaceEnum.Doris, "operator": "admin", } DBPrivManagerApi.modify_password(params=query_params) @@ -91,7 +91,7 @@ def write_auth_to_prv_manager(self, global_data: dict): "instances": [{"ip": global_data["domain"], "port": 0, "bk_cloud_id": global_data["bk_cloud_id"]}], "password": base64.b64encode(str(global_data["password"]).encode("utf-8")).decode("utf-8"), "username": global_data["username"], - "component": NameSpaceEnum.Pulsar, + "component": NameSpaceEnum.Doris, "operator": "admin", } DBPrivManagerApi.modify_password(params=query_params) diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index e98068f7b9..0ff3c6e8f6 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -25,7 +25,9 @@ from backend.flow.views.doris_disable import DisableDorisSceneApiView from backend.flow.views.doris_enable import EnableDorisSceneApiView from backend.flow.views.doris_reboot import RebootDorisSceneApiView +from backend.flow.views.doris_replace import ReplaceDorisSceneApiView from backend.flow.views.doris_scale_up import ScaleUpDorisSceneApiView +from backend.flow.views.doris_shrink import ShrinkDorisSceneApiView from backend.flow.views.es_apply import InstallEsSceneApiView from backend.flow.views.es_destroy import DestroyEsSceneApiView from backend.flow.views.es_disable import DisableEsSceneApiView @@ -492,4 +494,6 @@ url(r"^scene/disable_doris$", DisableDorisSceneApiView.as_view()), url(r"^scene/destroy_doris$", DestroyDorisSceneApiView.as_view()), url(r"^scene/reboot_doris$", RebootDorisSceneApiView.as_view()), + url(r"^scene/shrink_doris$", ShrinkDorisSceneApiView.as_view()), + url(r"^scene/replace_doris$", ReplaceDorisSceneApiView.as_view()), ] diff --git a/dbm-ui/backend/flow/utils/base/payload_handler.py b/dbm-ui/backend/flow/utils/base/payload_handler.py index 82b62c32d5..d9dfccb4a3 100644 --- a/dbm-ui/backend/flow/utils/base/payload_handler.py +++ b/dbm-ui/backend/flow/utils/base/payload_handler.py @@ -423,6 +423,8 @@ def get_bigdata_user_key(cluster_type: str) -> MySQLPrivComponent: return MySQLPrivComponent.KAFKA_FAKE_USER elif cluster_type == ClusterType.Pulsar.value: return MySQLPrivComponent.PULSAR_FAKE_USER + elif cluster_type == ClusterType.Doris.value: + return MySQLPrivComponent.DORIS_FAKE_USER @staticmethod def get_bigdata_username_by_cluster(cluster: Cluster, port: int) -> str: diff --git a/dbm-ui/backend/flow/utils/doris/consts.py b/dbm-ui/backend/flow/utils/doris/consts.py index c7152597ee..3eff7de372 100644 --- a/dbm-ui/backend/flow/utils/doris/consts.py +++ b/dbm-ui/backend/flow/utils/doris/consts.py @@ -26,6 +26,7 @@ class DorisMetaOperation(str, StructuredEnum): Add = EnumField("ADD", _("ADD")) Drop = EnumField("DROP", _("DROP")) Decommission = EnumField("DECOMMISSION", _("DECOMMISSION")) + ForceDrop = EnumField("DROPP", _("DROPP")) class DorisNodeOperation(str, StructuredEnum): @@ -35,3 +36,6 @@ class DorisNodeOperation(str, StructuredEnum): DORIS_ROLE_ALL = "all" +DORIS_FOLLOWER_MUST_COUNT = 3 +DORIS_OBSERVER_NOT_COUNT = 1 +DORIS_BACKEND_NOT_COUNT = 0 diff --git a/dbm-ui/backend/flow/utils/doris/doris_act_payload.py b/dbm-ui/backend/flow/utils/doris/doris_act_payload.py index c0f89836f9..d2e071173d 100644 --- a/dbm-ui/backend/flow/utils/doris/doris_act_payload.py +++ b/dbm-ui/backend/flow/utils/doris/doris_act_payload.py @@ -23,6 +23,20 @@ def __init__(self, ticket_data: dict): self.bk_biz_id = str(ticket_data["bk_biz_id"]) self.ticket_data = ticket_data + # 定义常规extend参数 + def get_common_extend(self, **kwargs) -> dict: + return { + "host": kwargs["ip"], + "cluster_name": self.ticket_data["cluster_name"], + "version": self.ticket_data["db_version"], + "role": kwargs["role"], + "username": self.ticket_data["username"], + "password": self.ticket_data["password"], + "http_port": self.ticket_data["http_port"], + "query_port": self.ticket_data["query_port"], + # "master_fe_ip": self.ticket_data["master_fe_ip"], + } + def get_sys_init_payload(self, **kwargs) -> dict: """ 拼接初始化机器的payload参数 @@ -60,46 +74,30 @@ def get_install_supervisor_payload(self, **kwargs) -> dict: } def get_render_config_payload(self, **kwargs) -> dict: + extend_dict = { + "fe_conf": self.ticket_data["fe_conf"], + "be_conf": self.ticket_data["be_conf"], + "master_fe_ip": self.ticket_data["master_fe_ip"], + } return { "db_type": DBActuatorTypeEnum.Doris.value, "action": DorisActuatorActionEnum.RenderConfig.value, "payload": { "general": {}, - "extend": { - "version": self.ticket_data["db_version"], - # 目标机器IP,目标机器获取IP比较麻烦,不易知道哪块网卡 - "host": kwargs["ip"], - "role": kwargs["role"], - "username": self.ticket_data["username"], - "password": self.ticket_data["password"], - "cluster_name": self.ticket_data["cluster_name"], - "http_port": self.ticket_data["http_port"], - "query_port": self.ticket_data["query_port"], - "master_fe_ip": self.ticket_data["master_fe_ip"], - "fe_conf": self.ticket_data["fe_conf"], - "be_conf": self.ticket_data["be_conf"], - }, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), }, } def get_start_fe_by_helper_payload(self, **kwargs) -> dict: + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + } return { "db_type": DBActuatorTypeEnum.Doris.value, "action": DorisActuatorActionEnum.StartFeByHelper.value, "payload": { "general": {}, - "extend": { - "version": self.ticket_data["db_version"], - # 目标机器IP,目标机器获取IP比较麻烦,不易知道哪块网卡 - "host": kwargs["ip"], - "role": kwargs["role"], - "username": self.ticket_data["username"], - "password": self.ticket_data["password"], - "cluster_name": self.ticket_data["cluster_name"], - "http_port": self.ticket_data["http_port"], - "query_port": self.ticket_data["query_port"], - "master_fe_ip": self.ticket_data["master_fe_ip"], - }, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), }, } @@ -117,25 +115,67 @@ def get_decompress_doris_pkg_payload(self, **kwargs) -> dict: }, } - def get_add_nodes_metadata_payload(self, **kwargs) -> dict: + # 添加节点到元数据 + def get_add_metadata_payload(self, **kwargs) -> dict: + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + "operation": DorisMetaOperation.Add.value, + "host_map": self.ticket_data["host_meta_map"], + } return { "db_type": DBActuatorTypeEnum.Doris.value, "action": DorisActuatorActionEnum.UpdateMetadata.value, "payload": { "general": {}, - "extend": { - "version": self.ticket_data["db_version"], - "host": kwargs["ip"], - "role": kwargs["role"], - "username": self.ticket_data["username"], - "password": self.ticket_data["password"], - "cluster_name": self.ticket_data["cluster_name"], - "http_port": self.ticket_data["http_port"], - "query_port": self.ticket_data["query_port"], - "master_fe_ip": self.ticket_data["master_fe_ip"], - "operation": DorisMetaOperation.Add.value, - "host_map": self.ticket_data["host_meta_map"], - }, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), + }, + } + + # 元数据管理:删除节点 + def get_drop_metadata_payload(self, **kwargs) -> dict: + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + "operation": DorisMetaOperation.Drop.value, + "host_map": self.ticket_data["host_meta_map"], + } + return { + "db_type": DBActuatorTypeEnum.Doris.value, + "action": DorisActuatorActionEnum.UpdateMetadata.value, + "payload": { + "general": {}, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), + }, + } + + # 元数据管理:强制删除节点 适用于BE节点 + def get_force_drop_metadata_payload(self, **kwargs) -> dict: + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + "operation": DorisMetaOperation.ForceDrop.value, + "host_map": self.ticket_data["host_meta_map"], + } + return { + "db_type": DBActuatorTypeEnum.Doris.value, + "action": DorisActuatorActionEnum.UpdateMetadata.value, + "payload": { + "general": {}, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), + }, + } + + # 元数据管理:退役BE节点 + def get_decommission_metadata_payload(self, **kwargs) -> dict: + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + "operation": DorisMetaOperation.Decommission.value, + "host_map": self.ticket_data["host_meta_map"], + } + return { + "db_type": DBActuatorTypeEnum.Doris.value, + "action": DorisActuatorActionEnum.UpdateMetadata.value, + "payload": { + "general": {}, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), }, } @@ -239,3 +279,20 @@ def get_clean_data_payload(self, **kwargs) -> dict: }, }, } + + def get_check_decommission_payload(self, **kwargs) -> dict: + """ + 拼接检查节点是否退役的payload参数 + """ + extend_dict = { + "master_fe_ip": self.ticket_data["master_fe_ip"], + "host_map": self.ticket_data["host_meta_map"], + } + return { + "db_type": DBActuatorTypeEnum.Doris.value, + "action": DorisActuatorActionEnum.CheckDecommission.value, + "payload": { + "general": {}, + "extend": dict(**(self.get_common_extend(**kwargs)), **extend_dict), + }, + } diff --git a/dbm-ui/backend/flow/utils/doris/doris_db_meta.py b/dbm-ui/backend/flow/utils/doris/doris_db_meta.py index dea3fe306a..c38fc961ce 100644 --- a/dbm-ui/backend/flow/utils/doris/doris_db_meta.py +++ b/dbm-ui/backend/flow/utils/doris/doris_db_meta.py @@ -158,3 +158,11 @@ def doris_disable(self) -> bool: def doris_enable(self) -> bool: api.cluster.doris.enable(self.ticket_data["cluster_id"]) return True + + def doris_shrink(self) -> bool: + storage_instances = self.__generate_storage_instance() + api.cluster.doris.shrink( + cluster_id=self.ticket_data["cluster_id"], + storages=storage_instances, + ) + return True diff --git a/dbm-ui/backend/flow/views/doris_replace.py b/dbm-ui/backend/flow/views/doris_replace.py new file mode 100644 index 0000000000..0312e777c2 --- /dev/null +++ b/dbm-ui/backend/flow/views/doris_replace.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging + +from django.utils.translation import ugettext as _ +from rest_framework.response import Response + +from backend.flow.engine.controller.doris import DorisController +from backend.flow.views.base import FlowTestView +from backend.utils.basic import generate_root_id + +logger = logging.getLogger("root") + + +class ReplaceDorisSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/replace_doris + params: + { + "bk_biz_id": 2005000002, + "ip_source": "manual_input", + "remark": "替换doris集群", + "ticket_type": "DORIS_REPLACE", + "cluster_id": 124, + "db_app_abbr": "blueking", + "uid": "111", + "created_by": "rtx", + "old_nodes": { + "hot": [ + {"ip": "127.1.1.1", "bk_cloud_id": 0} + ], + "cold": [ + {"ip": "127.1.1.2", "bk_cloud_id": 0} + ], + "observer": [ + {"ip": "127.1.1.4", "bk_cloud_id": 0} + ], + "follower": [ + {"ip": "127.1.1.3", "bk_cloud_id": 0} + ] + }, + "new_nodes": { + "hot": [ + {"ip": "127.1.2.1", "bk_cloud_id": 0} + ], + "cold": [ + {"ip": "127.1.2.2", "bk_cloud_id": 0} + ], + "observer": [ + {"ip": "127.1.1.4", "bk_cloud_id": 0} + ], + "follower": [ + {"ip": "127.1.1.3", "bk_cloud_id": 0} + ] + } + } + + """ + + def post(self, request): + logger.info(_("开始替换Doris集群场景")) + + root_id = generate_root_id() + logger.info("define root_id: {}".format(root_id)) + DorisController(root_id=root_id, ticket_data=request.data).doris_replace_scene() + return Response({"root_id": root_id}) diff --git a/dbm-ui/backend/flow/views/doris_shrink.py b/dbm-ui/backend/flow/views/doris_shrink.py new file mode 100644 index 0000000000..adfc2215a2 --- /dev/null +++ b/dbm-ui/backend/flow/views/doris_shrink.py @@ -0,0 +1,57 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging + +from django.utils.translation import ugettext as _ +from rest_framework.response import Response + +from backend.flow.engine.controller.doris import DorisController +from backend.flow.views.base import FlowTestView +from backend.utils.basic import generate_root_id + +logger = logging.getLogger("root") + + +class ShrinkDorisSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/shrink_doris + params: + { + "bk_biz_id": 2005000194, + "ip_source": "manual_input", + "remark": "测试缩容doris集群", + "ticket_type": "DORIS_SHRINK", + "cluster_id": 124, + "db_app_abbr": "blueking", + "uid": "111", + "created_by": "rtx", + "nodes": { + "hot": [ + {"ip": "127.1.1.1", "bk_cloud_id": 0} + ], + "cold": [ + {"ip": "127.1.1.2", "bk_cloud_id": 0} + ], + "observer": [ + {"ip": "127.1.1.3", "bk_cloud_id": 0} + ] + } + } + + """ + + def post(self, request): + logger.info(_("开始缩容Doris集群场景")) + + root_id = generate_root_id() + logger.info("define root_id: {}".format(root_id)) + DorisController(root_id=root_id, ticket_data=request.data).doris_shrink_scene() + return Response({"root_id": root_id})