From 30e2428695fafc5318f80cbc24083d46e36dfde3 Mon Sep 17 00:00:00 2001 From: yuanruji Date: Wed, 18 Sep 2024 16:39:52 +0800 Subject: [PATCH] feat(backend): standby slaves upgrade subflow #6804 --- .../mysql/mysql_non_standby_slaves_upgrade.py | 443 ++++++++++++++++++ .../backend/flow/engine/controller/mysql.py | 8 + dbm-ui/backend/flow/urls.py | 7 +- .../utils/mysql/common/mysql_cluster_info.py | 2 + dbm-ui/backend/flow/views/mysql_upgrade.py | 15 + 5 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py new file mode 100644 index 0000000000..9da6cf7c66 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py @@ -0,0 +1,443 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import copy +import logging.config +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import InstanceInnerRole, InstanceStatus +from backend.db_meta.models import Cluster, StorageInstance +from backend.db_package.models import Package +from backend.flow.consts import MediumEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import install_mysql_in_cluster_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config +from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow +from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent +from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent +from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent +from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs +from backend.flow.utils.mysql.mysql_act_dataclass import ( + ClearMachineKwargs, + DBMetaOPKwargs, + DownloadMediaKwargs, + ExecActuatorKwargs, +) +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext +from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.ticket.builders.common.constants import MySQLBackupSource + +logger = logging.getLogger("flow") + + +def MySQLNonStandbySlavesUpgradeFlow(object): + """ + 一直多从非stanby slaves升级 + """ + + def __init__(self, root_id: str, tick_data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param tick_data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.ticket_data = tick_data + self.data = {} + # 仅添加从库。不切换。不复制账号 + self.add_slave_only = self.ticket_data.get("add_slave_only", False) + + def upgrade_flow(self): + """ + { + "uid": "2022051612120001", + "created_by": "xxxx", + "bk_biz_id": "152", + "module": 1, + "ticket_type": "MYSQL_RESTORE_SLAVE", + "backup_source": "local", + "infos": [ + { + "cluster_id": "1001", + "pkg_id": 123, + "old_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.1", + "port": 20000 + }, + "new_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.2" + } + } + ] + } + """ + cluster_ids = [] + for info in self.ticket_data["infos"]: + cluster_ids.append(info["cluster_id"]) + + p = Builder( + root_id=self.root_id, + data=copy.deepcopy(self.ticket_data), + need_random_pass_cluster_ids=list(set(cluster_ids)), + ) + subflows = [] + for info in self.ticket_data["infos"]: + cluster = Cluster.objects.get(id=info["cluster_id"]) + old_slave_ip = info["old_slave"]["ip"] + bk_cloud_id = info["old_slave"]["bk_cloud_id"] + slave_storage = StorageInstance.objects.filter(machine__ip=old_slave_ip, machine__bk_cloud_id=bk_cloud_id) + ports = [] + for ins in slave_storage: + ports.append(ins.port) + + relation_cluster_ids = [info["cluster_id"]] + + subflow = non_standby_slaves_upgrade_subflow( + uid=str(self.ticket_data["uid"]), + root_id=self.root_id, + cluster=cluster, + new_slave=info["new_slave"], + old_slave=info["old_slave"], + ports=ports, + add_slave_only=self.add_slave_only, + relation_cluster_ids=relation_cluster_ids, + pkg_id=info["pkg_id"], + backup_source=self.ticket_data["backup_source"], + ) + subflows.append(subflow) + + p.add_parallel_sub_pipeline(sub_flows=subflows) + + p.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True) + + +def non_standby_slaves_upgrade_subflow( + uid: str, + root_id: str, + cluster: Cluster, + new_slave: dict, + old_slave: dict, + ports: list, + add_slave_only: bool, + relation_cluster_ids: list, + pkg_id: int, + backup_source: str, +): + """ + 一主多从非stanby slaves升级subflow + """ + parent_global_data = {"uid": uid, "root_id": root_id} + sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + new_slave_ip = new_slave["ip"] + bk_host_ids = [new_slave["bk_host_id"]] + old_slave_ip = old_slave["ip"] + pkg = Package.objects.get(id=pkg_id, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL) + db_config = get_instance_config(cluster.bk_cloud_id, old_slave_ip, ports=ports) + + # 安装mysql + install_sub_pipeline = build_install_sub_pipeline( + uid, root_id, cluster, new_slave_ip, ports, bk_host_ids, db_config, pkg_id, pkg.name, relation_cluster_ids + ) + sub_pipeline.add_sub_pipeline(sub_flow=install_sub_pipeline) + + if backup_source == MySQLBackupSource.LOCAL: + # 使用本地备份来做迁移 + sync_data_sub_pipeline_list = build_data_repl_from_local_backup( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip + ) + sub_pipeline.add_sub_pipeline(sub_flows=sync_data_sub_pipeline_list) + elif backup_source == MySQLBackupSource.REMOTE: + # 使用远程备份来做迁移 + sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip + ) + sub_pipeline.add_sub_pipeline(sub_flows=sync_data_sub_pipeline_list) + + # 切换到新从节点 + if not add_slave_only: + switch_sub_pipeline_list = build_switch_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flows=switch_sub_pipeline_list) + + # 卸载旧从节点 + uninstall_svr_sub_pipeline = build_uninstall_sub_pipeline( + root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster, ports + ) + sub_pipeline.add_sub_pipeline(sub_flow=uninstall_svr_sub_pipeline) + + return sub_pipeline + + +def build_install_sub_pipeline( + uid, root_id, cluster, new_slave_ip, ports, bk_host_ids, db_config, pkg_id, pkg_name, relation_cluster_ids +): + install_sub_pipeline = SubBuilder(root_id=root_id, data={"uid": uid, "root_id": root_id}) + + install_sub_pipeline.add_sub_pipeline( + sub_flow=install_mysql_in_cluster_sub_flow( + uid=uid, + root_id=root_id, + cluster=cluster, + new_mysql_list=[new_slave_ip], + install_ports=ports, + bk_host_ids=bk_host_ids, + db_config=db_config, + pkg_id=pkg_id, + ) + ) + + cluster_info = { + "install_ip": new_slave_ip, + "cluster_ids": relation_cluster_ids, + "package": pkg_name, + } + + install_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.slave_recover_add_instance.__name__, + cluster=copy.deepcopy(cluster_info), + is_update_trans_data=False, + ) + ), + ) + + install_sub_pipeline.add_act( + act_name=_("安装backup-client工具"), + act_component_code=DownloadBackupClientComponent.code, + kwargs=asdict( + DownloadBackupClientKwargs( + bk_cloud_id=cluster.bk_cloud_id, + bk_biz_id=int(cluster.bk_biz_id), + download_host_list=[new_slave_ip], + ) + ), + ) + + exec_act_kwargs = ExecActuatorKwargs( + cluster=cluster_info, + bk_cloud_id=cluster.bk_cloud_id, + cluster_type=cluster.cluster_type, + get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__, + exec_ip=[new_slave_ip], + ) + install_sub_pipeline.add_act( + act_name=_("安装临时备份程序"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + return install_sub_pipeline + + +def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip): + sync_data_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + cluster_info = { + "mysql_port": master.port, + "cluster_id": cluster_model.id, + "cluster_type": cluster.cluster_type, + "master_ip": master.machine.ip, + "master_port": master.port, + "new_slave_ip": new_slave_ip, + "new_slave_port": master.port, + "bk_cloud_id": cluster_model.bk_cloud_id, + "file_target_path": f"/data/dbbak/{root_id}/{master.port}", + "charset": parent_global_data, + "change_master_force": True, + } + sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=slave_recover_sub_flow( + root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster_info + ) + ) + sync_data_sub_pipeline.add_act( + act_name=_("同步完毕,写入主从关系,设置节点为running状态"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + return sync_data_sub_pipeline_list + + +def build_data_repl_from_local_backup( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset +): + sync_data_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + cluster = { + "mysql_port": master.port, + "cluster_id": cluster_model.id, + "cluster_type": cluster.cluster_type, + "master_ip": master.machine.ip, + "master_port": master.port, + "new_slave_ip": new_slave_ip, + "new_slave_port": master.port, + "bk_cloud_id": cluster_model.bk_cloud_id, + "file_target_path": f"/data/dbbak/{root_id}/{master.port}", + "charset": charset, + "change_master_force": True, + "change_master": True, + } + sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + # 获取本地备份并恢复 + inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] + non_stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=False, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[new_slave_ip]) + if len(non_stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(non_stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, non_stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_data_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) + ) + # 恢复完毕的时候 slave 状态改为running + sync_data_sub_pipeline.add_act( + act_name=_("同步完毕,写入主从关系,设置节点为running状态"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, + cluster=cluster, + is_update_trans_data=True, + ) + ), + ) + + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + return sync_data_sub_pipeline_list + + +def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip): + switch_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + switch_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + switch_sub_pipeline.add_sub_pipeline( + sub_flow=slave_migrate_switch_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster_model, + old_slave_ip=old_slave_ip, + new_slave_ip=new_slave_ip, + ) + ) + domain_map = get_tendb_ha_entry(cluster_model.id) + cluster_info = { + "slave_domain": domain_map[old_slave_ip], + "new_slave_ip": new_slave_ip, + "old_slave_ip": old_slave_ip, + "cluster_id": cluster_model.id, + } + switch_sub_pipeline.add_act( + act_name=_("slave切换完毕,修改集群 {} 数据".format(cluster_model.id)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_restore_slave_change_cluster_info.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点"))) + return switch_sub_pipeline_list + + +def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster, ports): + uninstall_svr_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + cluster_info = {"uninstall_ip": old_slave_ip, "cluster_ids": relation_cluster_ids} + + uninstall_svr_sub_pipeline.add_act( + act_name=_("卸载实例前先删除元数据"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.slave_recover_del_instance.__name__, + is_update_trans_data=True, + cluster=cluster_info, + ) + ), + ) + + uninstall_svr_sub_pipeline.add_act( + act_name=_("下发db-actor到节点{}".format(old_slave_ip)), + act_component_code=TransFileComponent.code, + kwargs=asdict( + DownloadMediaKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=old_slave_ip, + file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), + ) + ), + ) + + uninstall_svr_sub_pipeline.add_act( + act_name=_("清理机器配置"), + act_component_code=MySQLClearMachineComponent.code, + kwargs=asdict( + ClearMachineKwargs( + exec_ip=old_slave_ip, + bk_cloud_id=cluster.bk_cloud_id, + ) + ), + ) + + uninstall_svr_sub_pipeline.add_sub_pipeline( + sub_flow=uninstall_instance_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + ip=old_slave_ip, + ports=ports, + ) + ) + + return uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(old_slave_ip))) diff --git a/dbm-ui/backend/flow/engine/controller/mysql.py b/dbm-ui/backend/flow/engine/controller/mysql.py index a64f64f9f9..108c02d178 100644 --- a/dbm-ui/backend/flow/engine/controller/mysql.py +++ b/dbm-ui/backend/flow/engine/controller/mysql.py @@ -34,6 +34,7 @@ from backend.flow.engine.bamboo.scene.mysql.mysql_master_slave_switch import MySQLMasterSlaveSwitchFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_flow import MySQLMigrateClusterFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_remote_flow import MySQLMigrateClusterRemoteFlow +from backend.flow.engine.bamboo.scene.mysql.mysql_non_standby_slaves_upgrade import MySQLNonStandbySlavesUpgradeFlow from backend.flow.engine.bamboo.scene.mysql.mysql_open_area_flow import MysqlOpenAreaFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition import MysqlPartitionFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition_cron import MysqlPartitionCronFlow @@ -674,3 +675,10 @@ def push_peripheral_config_scene(self): flow = MySQLPushPeripheralConfigFlow(root_id=self.root_id, data=self.ticket_data) flow.push_config() + + def non_standby_slaves_upgrade_scene(self): + """ + 非Standby从库升级 + """ + flow = MySQLNonStandbySlavesUpgradeFlow(root_id=self.root_id, data=self.ticket_data) + flow.upgrade() diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index cec16c0499..5ce3551d5c 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -139,7 +139,11 @@ ) from backend.flow.views.mysql_single_rename_database import MySQLSingleRenameDatabaseView from backend.flow.views.mysql_single_truncate_data import MySQLSingleTruncateDataView -from backend.flow.views.mysql_upgrade import MigrateUpgradeMySQLSceneApiView, UpgradeMySQLSceneApiView +from backend.flow.views.mysql_upgrade import ( + MigrateUpgradeMySQLSceneApiView, + NonStanbySlavesUpgradeMySQLSceneApiView, + UpgradeMySQLSceneApiView, +) from backend.flow.views.name_service import ( ClbCreateSceneApiView, ClbDeleteSceneApiView, @@ -340,6 +344,7 @@ url(r"^scene/upgrade_mysql_proxy$", UpgradeMySQLProxySceneApiView.as_view()), url(r"^scene/upgrade_mysql$", UpgradeMySQLSceneApiView.as_view()), url(r"^scene/migrate_upgrade_mysql$", MigrateUpgradeMySQLSceneApiView.as_view()), + url(r"^scene/non_stanby_slave_upgrade_mysql$", NonStanbySlavesUpgradeMySQLSceneApiView.as_view()), # mysql url(r"^scene/dbconsole_dump$", DbConsoleDumpApiView.as_view()), url(r"^scene/install_mysql_apply$", InstallMySQLSingleSceneApiView.as_view()), diff --git a/dbm-ui/backend/flow/utils/mysql/common/mysql_cluster_info.py b/dbm-ui/backend/flow/utils/mysql/common/mysql_cluster_info.py index 8a6f3fa048..3c633ad725 100644 --- a/dbm-ui/backend/flow/utils/mysql/common/mysql_cluster_info.py +++ b/dbm-ui/backend/flow/utils/mysql/common/mysql_cluster_info.py @@ -72,3 +72,5 @@ def get_ports(cluster_ids: list) -> list: for cluster in clusters: cluster_ports.append(cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value).port) return cluster_ports + + diff --git a/dbm-ui/backend/flow/views/mysql_upgrade.py b/dbm-ui/backend/flow/views/mysql_upgrade.py index 9a830b7acf..aabec863a3 100644 --- a/dbm-ui/backend/flow/views/mysql_upgrade.py +++ b/dbm-ui/backend/flow/views/mysql_upgrade.py @@ -49,3 +49,18 @@ def post(request): test = MySQLController(root_id=root_id, ticket_data=request.data) test.mysql_migrate_upgrade_scene() return Response({"root_id": root_id}) + + +class NonStanbySlavesUpgradeMySQLSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/migrate_upgrade_mysql + """ + + @staticmethod + def post(request): + logger.info(_("开始测试迁移升级非standy mysql实例场景")) + root_id = generate_root_id() + logger.info("define root_id: {}".format(root_id)) + test = MySQLController(root_id=root_id, ticket_data=request.data) + test.non_standby_slaves_upgrade_scene() + return Response({"root_id": root_id})