From 3c7bca09d2c8e5838ad02153ed170f1bee127ba2 Mon Sep 17 00:00:00 2001 From: yuanruji Date: Wed, 18 Sep 2024 16:39:52 +0800 Subject: [PATCH] feat(backend): standby slaves upgrade subflow #6804 --- .../db_services/mysql/toolbox/serializers.py | 15 + .../backend/db_services/mysql/toolbox/urls.py | 7 +- .../db_services/mysql/toolbox/views.py | 45 +- .../mysql_migrate_cluster_remote_flow.py | 1 - .../mysql/mysql_non_standby_slaves_upgrade.py | 459 ++++++++++++++++++ .../bamboo/scene/mysql/mysql_upgrade.py | 22 +- .../backend/flow/engine/controller/mysql.py | 8 + dbm-ui/backend/flow/urls.py | 7 +- .../flow/utils/mysql/mysql_act_playload.py | 123 +++-- dbm-ui/backend/flow/views/mysql_upgrade.py | 15 + 10 files changed, 649 insertions(+), 53 deletions(-) create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py diff --git a/dbm-ui/backend/db_services/mysql/toolbox/serializers.py b/dbm-ui/backend/db_services/mysql/toolbox/serializers.py index 933333fad4..e8600457cd 100644 --- a/dbm-ui/backend/db_services/mysql/toolbox/serializers.py +++ b/dbm-ui/backend/db_services/mysql/toolbox/serializers.py @@ -52,3 +52,18 @@ class TendbhaTransferToOtherBizFlowBuilder(BaseTendbTicketFlowBuilder): inner_flow_builder = TendbhaTransferToOtherBizFlowParamBuilder inner_flow_name = _("TenDBHa集群迁移到其他业务") retry_type = FlowRetryType.MANUAL_RETRY + + +class TendbhaAddSlaveDomainSerializer(serializers.Serializer): + cluster_id = serializers.IntegerField(help_text=_("集群id")) + slave_ip = serializers.CharField(help_text=_("slave ip")) + slave_port = serializers.IntegerField(help_text=_("slave port")) + domain_name = serializers.CharField(help_text=_("slave domain")) + + class Meta: + swagger_schema_fields = { + "bk_biz_id": 11, + "slave_ip": "1.1.1.1", + "slave_port": 3306, + "domain_name": "", + } diff --git a/dbm-ui/backend/db_services/mysql/toolbox/urls.py b/dbm-ui/backend/db_services/mysql/toolbox/urls.py index c1e856deef..3b6b6b5d3c 100644 --- a/dbm-ui/backend/db_services/mysql/toolbox/urls.py +++ b/dbm-ui/backend/db_services/mysql/toolbox/urls.py @@ -11,11 +11,16 @@ from rest_framework.routers import DefaultRouter -from backend.db_services.mysql.toolbox.views import TendbhaTransferToOtherBizViewSet, ToolboxViewSet +from backend.db_services.mysql.toolbox.views import ( + TendbHaSlaveInstanceAddDomainSet, + TendbhaTransferToOtherBizViewSet, + ToolboxViewSet, +) router = DefaultRouter(trailing_slash=True) router.register(r"toolbox", ToolboxViewSet, basename="toolbox") router.register(r"toolbox", TendbhaTransferToOtherBizViewSet, basename="toolbox") +router.register(r"toolbox", TendbHaSlaveInstanceAddDomainSet, basename="toolbox") urlpatterns = [] urlpatterns += router.urls diff --git a/dbm-ui/backend/db_services/mysql/toolbox/views.py b/dbm-ui/backend/db_services/mysql/toolbox/views.py index 241f3072cc..15f0b12b8e 100644 --- a/dbm-ui/backend/db_services/mysql/toolbox/views.py +++ b/dbm-ui/backend/db_services/mysql/toolbox/views.py @@ -9,6 +9,7 @@ specific language governing permissions and limitations under the License. """ import logging +import traceback from typing import Any from django.utils.translation import ugettext_lazy as _ @@ -19,13 +20,15 @@ from backend.bk_web.swagger import common_swagger_auto_schema from backend.components import DBConfigApi from backend.components.dbconfig.constants import FormatType, LevelName -from backend.db_meta.enums import ClusterType -from backend.db_meta.models import Cluster, DBModule +from backend.db_meta.enums import ClusterEntryRole, ClusterEntryType, ClusterType, InstanceInnerRole +from backend.db_meta.models import Cluster, ClusterEntry, DBModule from backend.db_services.mysql.toolbox.handlers import ToolboxHandler from backend.db_services.mysql.toolbox.serializers import ( QueryPkgListByCompareVersionSerializer, + TendbhaAddSlaveDomainSerializer, TendbhaTransferToOtherBizSerializer, ) +from backend.flow.utils.dns_manage import DnsManage from backend.iam_app.handlers.drf_perm.base import DBManagePermission from backend.ticket.constants import TicketType from backend.ticket.models import Ticket @@ -52,6 +55,44 @@ def query_higher_version_pkg_list(self, request, **kwargs): return Response(ToolboxHandler().query_higher_version_pkg_list(cluster_id, higher_major_version)) +class TendbHaSlaveInstanceAddDomainSet(viewsets.SystemViewSet): + action_permission_map = {} + default_permission_class = [DBManagePermission()] + + @common_swagger_auto_schema( + operation_summary=_("给从库添加域名"), + request_body=TendbhaAddSlaveDomainSerializer(), + tags=[SWAGGER_TAG], + ) + @action(methods=["POST"], detail=False, serializer_class=TendbhaAddSlaveDomainSerializer) + def slave_ins_add_domain(self, request, **kwargs): + data = self.params_validate(self.get_serializer_class()) + cluster_id = data["cluster_id"] + domain = data["domain_name"] + slave_ip = data["slave_ip"] + slave_port = data["slave_port"] + if ClusterEntry.objects.filter(cluster_entry_type=ClusterEntryType.DNS.value, entry=domain).exists(): + return Response({"result": False, "message": _("{}域名已经存在".format(domain))}) + cluster_obj = Cluster.objects.get(id=cluster_id) + cluster_entry = ClusterEntry.objects.create( + cluster=cluster_obj, + cluster_entry_type=ClusterEntryType.DNS.value, + entry=domain, + role=ClusterEntryRole.SLAVE_ENTRY.value, + ) + dns_manage = DnsManage(bk_biz_id=cluster_obj.bk_biz_id, bk_cloud_id=cluster_obj.bk_cloud_id) + slave_ins = cluster_obj.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, machine__ip=slave_ip, port=slave_port + ) + cluster_entry.storageinstance_set.add(*slave_ins) + try: + dns_manage.create_domain(instance_list=["{}#{}".format(slave_ip, str(slave_port))], add_domain_name=domain) + except Exception as e: + logger.error(traceback.format_exc()) + return Response({"result": False, "message": _("添加dns记录失败{}".format(e))}) + return Response({"result": True, "message": _("success")}) + + class TendbhaTransferToOtherBizViewSet(viewsets.SystemViewSet): action_permission_map = {} default_permission_class = [DBManagePermission()] diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py index 329da94339..49d71a13bf 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_migrate_cluster_remote_flow.py @@ -416,7 +416,6 @@ def migrate_cluster_flow(self, use_for_upgrade=False): uninstall_svr_sub_pipeline_list.append( uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(ip))) ) - # 安装实例 tendb_migrate_pipeline.add_parallel_sub_pipeline(sub_flow_list=install_sub_pipeline_list) # 同步配置 diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py new file mode 100644 index 0000000000..dfb802baa5 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py @@ -0,0 +1,459 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import copy +import logging.config +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.constants import IP_PORT_DIVIDER +from backend.db_meta.enums import InstanceInnerRole, InstanceStatus +from backend.db_meta.models import Cluster +from backend.db_package.models import Package +from backend.flow.consts import MediumEnum +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry +from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import install_mysql_in_cluster_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.get_master_config import get_instance_config +from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_restore_data_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.recover_slave_instance import slave_recover_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.slave_recover_switch import slave_migrate_switch_sub_flow +from backend.flow.engine.bamboo.scene.mysql.common.uninstall_instance import uninstall_instance_sub_flow +from backend.flow.plugins.components.collections.common.download_backup_client import DownloadBackupClientComponent +from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent +from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent +from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent +from backend.flow.utils.common_act_dataclass import DownloadBackupClientKwargs +from backend.flow.utils.mysql.common.mysql_cluster_info import get_ports, get_version_and_charset +from backend.flow.utils.mysql.mysql_act_dataclass import ( + ClearMachineKwargs, + DBMetaOPKwargs, + DownloadMediaKwargs, + ExecActuatorKwargs, +) +from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload +from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext +from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta +from backend.ticket.builders.common.constants import MySQLBackupSource + +logger = logging.getLogger("flow") + + +class MySQLNonStandbySlavesUpgradeFlow(object): + """ + 一直多从非stanby slaves升级 + """ + + def __init__(self, root_id: str, ticket_data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param tick_data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.ticket_data = ticket_data + self.data = {} + # 仅添加从库。不切换。不复制账号 + self.add_slave_only = self.ticket_data.get("add_slave_only", False) + + def upgrade(self): + """ + { + "uid": "2022051612120001", + "created_by": "xxxx", + "bk_biz_id": "152", + "ticket_type": "MYSQL_RESTORE_SLAVE", + "backup_source": "local", + "infos": [ + { + "cluster_ids": [1001,1002], + "pkg_id": 123, + "new_db_module_id: "578", + "old_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.1", + "port": 20000 + }, + "new_slave": { + "bk_biz_id": 200005000, + "bk_cloud_id": 0, + "bk_host_id": 1, + "ip": "1.1.1.2" + } + } + ] + } + """ + cluster_ids = [] + for info in self.ticket_data["infos"]: + cluster_ids.extend(info["cluster_ids"]) + + p = Builder( + root_id=self.root_id, + data=copy.deepcopy(self.ticket_data), + need_random_pass_cluster_ids=list(set(cluster_ids)), + ) + subflows = [] + created_by = self.ticket_data["ticket_type"] + for info in self.ticket_data["infos"]: + cluster = Cluster.objects.get(id=info["cluster_ids"][0]) + ports = get_ports(info["cluster_ids"]) + relation_cluster_ids = [info["cluster_ids"]] + + subflow = non_standby_slaves_upgrade_subflow( + uid=str(self.ticket_data["uid"]), + root_id=self.root_id, + cluster=cluster, + new_slave=info["new_slave"], + old_slave=info["old_slave"], + ports=ports, + add_slave_only=self.add_slave_only, + relation_cluster_ids=relation_cluster_ids, + pkg_id=info["pkg_id"], + new_db_module_id=info["new_db_module_id"], + backup_source=self.ticket_data["backup_source"], + created_by=created_by, + ) + subflows.append(subflow) + + p.add_parallel_sub_pipeline(sub_flows=subflows) + + p.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True) + + +def non_standby_slaves_upgrade_subflow( + uid: str, + root_id: str, + cluster: Cluster, + new_slave: dict, + old_slave: dict, + ports: list, + add_slave_only: bool, + relation_cluster_ids: list, + pkg_id: int, + new_db_module_id: int, + backup_source: str, + created_by: str, +): + """ + 一主多从非stanby slaves升级subflow + """ + pkg = Package.objects.get(id=pkg_id, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL) + charset, _ = get_version_and_charset( + cluster.bk_biz_id, db_module_id=new_db_module_id, cluster_type=cluster.cluster_type + ) + parent_global_data = { + "uid": uid, + "root_id": root_id, + "bk_biz_id": cluster.bk_biz_id, + "bk_cloud_id": cluster.bk_cloud_id, + "db_module_id": new_db_module_id, + "time_zone": cluster.time_zone, + "cluster_type": cluster.cluster_type, + "created_by": created_by, + "package": pkg.name, + "ports": ports, + "charset": charset, + } + + sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) + new_slave_ip = new_slave["ip"] + bk_host_ids = [new_slave["bk_host_id"]] + old_slave_ip = old_slave["ip"] + db_config = get_instance_config(cluster.bk_cloud_id, old_slave_ip, ports=ports) + + # 安装mysql + install_sub_pipeline = build_install_sub_pipeline( + uid, root_id, cluster, new_slave_ip, ports, bk_host_ids, db_config, pkg_id, pkg.name, relation_cluster_ids + ) + sub_pipeline.add_sub_pipeline(sub_flow=install_sub_pipeline) + + if backup_source == MySQLBackupSource.LOCAL: + # 使用本地备份来做迁移 + sync_data_sub_pipeline_list = build_data_repl_from_local_backup( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset + ) + sub_pipeline.add_sub_pipeline(sub_flows=sync_data_sub_pipeline_list) + elif backup_source == MySQLBackupSource.REMOTE: + # 使用远程备份来做迁移 + sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip + ) + sub_pipeline.add_sub_pipeline(sub_flows=sync_data_sub_pipeline_list) + + # 切换到新从节点 + if not add_slave_only: + switch_sub_pipeline_list = build_switch_sub_pipelines( + root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flows=switch_sub_pipeline_list) + + # 卸载旧从节点 + uninstall_svr_sub_pipeline = build_uninstall_sub_pipeline( + root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster, ports + ) + sub_pipeline.add_sub_pipeline(sub_flow=uninstall_svr_sub_pipeline) + + return sub_pipeline + + +def build_install_sub_pipeline( + uid, root_id, cluster, new_slave_ip, ports, bk_host_ids, db_config, pkg_id, pkg_name, relation_cluster_ids +): + install_sub_pipeline = SubBuilder(root_id=root_id, data={"uid": uid, "root_id": root_id}) + + install_sub_pipeline.add_sub_pipeline( + sub_flow=install_mysql_in_cluster_sub_flow( + uid=uid, + root_id=root_id, + cluster=cluster, + new_mysql_list=[new_slave_ip], + install_ports=ports, + bk_host_ids=bk_host_ids, + db_config=db_config, + pkg_id=pkg_id, + ) + ) + + cluster_info = { + "install_ip": new_slave_ip, + "cluster_ids": relation_cluster_ids, + "package": pkg_name, + } + + install_sub_pipeline.add_act( + act_name=_("写入初始化实例的db_meta元信息"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.slave_recover_add_instance.__name__, + cluster=copy.deepcopy(cluster_info), + is_update_trans_data=False, + ) + ), + ) + + install_sub_pipeline.add_act( + act_name=_("安装backup-client工具"), + act_component_code=DownloadBackupClientComponent.code, + kwargs=asdict( + DownloadBackupClientKwargs( + bk_cloud_id=cluster.bk_cloud_id, + bk_biz_id=int(cluster.bk_biz_id), + download_host_list=[new_slave_ip], + ) + ), + ) + + exec_act_kwargs = ExecActuatorKwargs( + cluster=cluster_info, + bk_cloud_id=cluster.bk_cloud_id, + cluster_type=cluster.cluster_type, + get_mysql_payload_func=MysqlActPayload.get_install_tmp_db_backup_payload.__name__, + exec_ip=[new_slave_ip], + ) + install_sub_pipeline.add_act( + act_name=_("安装临时备份程序"), + act_component_code=ExecuteDBActuatorScriptComponent.code, + kwargs=asdict(exec_act_kwargs), + ) + + return install_sub_pipeline + + +def build_sync_data_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip): + sync_data_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + cluster_info = { + "mysql_port": master.port, + "cluster_id": cluster_model.id, + "cluster_type": cluster.cluster_type, + "master_ip": master.machine.ip, + "master_port": master.port, + "new_slave_ip": new_slave_ip, + "new_slave_port": master.port, + "bk_cloud_id": cluster_model.bk_cloud_id, + "file_target_path": f"/data/dbbak/{root_id}/{master.port}", + "charset": parent_global_data, + "change_master_force": True, + } + sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=slave_recover_sub_flow( + root_id=root_id, ticket_data=copy.deepcopy(parent_global_data), cluster_info=cluster_info + ) + ) + sync_data_sub_pipeline.add_act( + act_name=_("同步完毕,写入主从关系,设置节点为running状态"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + return sync_data_sub_pipeline_list + + +def build_data_repl_from_local_backup( + root_id, parent_global_data, relation_cluster_ids, cluster, new_slave_ip, charset +): + sync_data_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + master = cluster_model.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + cluster = { + "mysql_port": master.port, + "cluster_id": cluster_model.id, + "cluster_type": cluster.cluster_type, + "master_ip": master.machine.ip, + "master_port": master.port, + "new_slave_ip": new_slave_ip, + "new_slave_port": master.port, + "bk_cloud_id": cluster_model.bk_cloud_id, + "file_target_path": f"/data/dbbak/{root_id}/{master.port}", + "charset": charset, + "change_master_force": True, + "change_master": True, + } + sync_data_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + # 获取本地备份并恢复 + inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)] + non_stand_by_slaves = cluster_model.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, + is_stand_by=False, + status=InstanceStatus.RUNNING.value, + ).exclude(machine__ip__in=[new_slave_ip]) + if len(non_stand_by_slaves) > 0: + inst_list.append( + "{}{}{}".format(non_stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, non_stand_by_slaves[0].port) + ) + sync_data_sub_pipeline.add_sub_pipeline( + sub_flow=mysql_restore_data_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster, + cluster_model=cluster_model, + ins_list=inst_list, + ) + ) + # 恢复完毕的时候 slave 状态改为running + sync_data_sub_pipeline.add_act( + act_name=_("同步完毕,写入主从关系,设置节点为running状态"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_add_slave_info.__name__, + cluster=cluster, + is_update_trans_data=True, + ) + ), + ) + + sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据"))) + return sync_data_sub_pipeline_list + + +def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids, old_slave_ip, new_slave_ip): + switch_sub_pipeline_list = [] + for cluster_id in relation_cluster_ids: + cluster_model = Cluster.objects.get(id=cluster_id) + switch_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + switch_sub_pipeline.add_sub_pipeline( + sub_flow=slave_migrate_switch_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + cluster=cluster_model, + old_slave_ip=old_slave_ip, + new_slave_ip=new_slave_ip, + ) + ) + domain_map = get_tendb_ha_entry(cluster_model.id) + cluster_info = { + "slave_domain": domain_map[old_slave_ip], + "new_slave_ip": new_slave_ip, + "old_slave_ip": old_slave_ip, + "cluster_id": cluster_model.id, + } + switch_sub_pipeline.add_act( + act_name=_("slave切换完毕,修改集群 {} 数据".format(cluster_model.id)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.mysql_restore_slave_change_cluster_info.__name__, + cluster=cluster_info, + is_update_trans_data=True, + ) + ), + ) + switch_sub_pipeline_list.append(switch_sub_pipeline.build_sub_process(sub_name=_("切换到新从节点"))) + return switch_sub_pipeline_list + + +def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster, ports): + uninstall_svr_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) + cluster_info = {"uninstall_ip": old_slave_ip, "cluster_ids": relation_cluster_ids} + + uninstall_svr_sub_pipeline.add_act( + act_name=_("卸载实例前先删除元数据"), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.slave_recover_del_instance.__name__, + is_update_trans_data=True, + cluster=cluster_info, + ) + ), + ) + + uninstall_svr_sub_pipeline.add_act( + act_name=_("下发db-actor到节点{}".format(old_slave_ip)), + act_component_code=TransFileComponent.code, + kwargs=asdict( + DownloadMediaKwargs( + bk_cloud_id=cluster.bk_cloud_id, + exec_ip=old_slave_ip, + file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(), + ) + ), + ) + + uninstall_svr_sub_pipeline.add_act( + act_name=_("清理机器配置"), + act_component_code=MySQLClearMachineComponent.code, + kwargs=asdict( + ClearMachineKwargs( + exec_ip=old_slave_ip, + bk_cloud_id=cluster.bk_cloud_id, + ) + ), + ) + + uninstall_svr_sub_pipeline.add_sub_pipeline( + sub_flow=uninstall_instance_sub_flow( + root_id=root_id, + ticket_data=copy.deepcopy(parent_global_data), + ip=old_slave_ip, + ports=ports, + ) + ) + + return uninstall_svr_sub_pipeline.build_sub_process(sub_name=_("卸载remote节点{}".format(old_slave_ip))) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_upgrade.py index 8cb0977478..319cd4ea9c 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_upgrade.py @@ -25,7 +25,6 @@ from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch -from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_flow import MySQLMigrateClusterFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_remote_flow import MySQLMigrateClusterRemoteFlow from backend.flow.plugins.components.collections.common.pause import PauseComponent from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent @@ -36,7 +35,6 @@ from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta from backend.flow.utils.mysql.mysql_version_parse import get_sub_version_by_pkg_name, mysql_version_parse -from backend.ticket.builders.common.constants import MySQLBackupSource logger = logging.getLogger("flow") @@ -66,9 +64,20 @@ def convert_mysql8_version_num(verno: int) -> int: # MySQLMigrateClusterRemoteFlow: 使用远程备份来恢复 # MySQLMigrateClusterFlow: 使用本地备份来恢复 -class MySQMigrateUpgradeFlow(MySQLMigrateClusterRemoteFlow, MySQLMigrateClusterFlow): +class MySQMigrateUpgradeFlow(MySQLMigrateClusterRemoteFlow): """ 构建mysql主从成对迁移的方式来升级MySQL + # new_non_standy_slave_ip_list:[ + { + "old_slave": {"ip": "127.0.0.2", "bk_cloud_id": 0, "bk_host_id": 1, "bk_biz_id": 2005000002}, + "new_slave": {"ip": "127.0.0.3", "bk_cloud_id": 0, "bk_host_id": 1, "bk_biz_id": 2005000003}, + }, + { + "old_slave": {"ip": "127.0.1.2", "bk_cloud_id": 0, "bk_host_id": 1, "bk_biz_id": 2005000004}, + "new_slave": {"ip": "127.0.1.3", "bk_cloud_id": 0, "bk_host_id": 1, "bk_biz_id": 2005000005}, + } + ] + ] """ def __init__(self, root_id: str, ticket_data: Optional[Dict]): @@ -78,12 +87,7 @@ def __init__(self, root_id: str, ticket_data: Optional[Dict]): def upgrade(self): # 进行模块的版本检查 self.__pre_check() - if self.ticket_data["backup_source"] == MySQLBackupSource.LOCAL: - # 使用本地备份来做迁移 - self.deploy_migrate_cluster_flow(use_for_upgrade=True) - elif self.ticket_data["backup_source"] == MySQLBackupSource.REMOTE: - # 使用远程备份来做迁移 - self.migrate_cluster_flow(use_for_upgrade=True) + self.migrate_cluster_flow(use_for_upgrade=True) def __pre_check(self): for info in self.ticket_data["infos"]: diff --git a/dbm-ui/backend/flow/engine/controller/mysql.py b/dbm-ui/backend/flow/engine/controller/mysql.py index a64f64f9f9..5149a3b8ce 100644 --- a/dbm-ui/backend/flow/engine/controller/mysql.py +++ b/dbm-ui/backend/flow/engine/controller/mysql.py @@ -34,6 +34,7 @@ from backend.flow.engine.bamboo.scene.mysql.mysql_master_slave_switch import MySQLMasterSlaveSwitchFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_flow import MySQLMigrateClusterFlow from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_remote_flow import MySQLMigrateClusterRemoteFlow +from backend.flow.engine.bamboo.scene.mysql.mysql_non_standby_slaves_upgrade import MySQLNonStandbySlavesUpgradeFlow from backend.flow.engine.bamboo.scene.mysql.mysql_open_area_flow import MysqlOpenAreaFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition import MysqlPartitionFlow from backend.flow.engine.bamboo.scene.mysql.mysql_partition_cron import MysqlPartitionCronFlow @@ -674,3 +675,10 @@ def push_peripheral_config_scene(self): flow = MySQLPushPeripheralConfigFlow(root_id=self.root_id, data=self.ticket_data) flow.push_config() + + def non_standby_slaves_upgrade_scene(self): + """ + 非Standby从库升级 + """ + flow = MySQLNonStandbySlavesUpgradeFlow(root_id=self.root_id, ticket_data=self.ticket_data) + flow.upgrade() diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index cec16c0499..5ce3551d5c 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -139,7 +139,11 @@ ) from backend.flow.views.mysql_single_rename_database import MySQLSingleRenameDatabaseView from backend.flow.views.mysql_single_truncate_data import MySQLSingleTruncateDataView -from backend.flow.views.mysql_upgrade import MigrateUpgradeMySQLSceneApiView, UpgradeMySQLSceneApiView +from backend.flow.views.mysql_upgrade import ( + MigrateUpgradeMySQLSceneApiView, + NonStanbySlavesUpgradeMySQLSceneApiView, + UpgradeMySQLSceneApiView, +) from backend.flow.views.name_service import ( ClbCreateSceneApiView, ClbDeleteSceneApiView, @@ -340,6 +344,7 @@ url(r"^scene/upgrade_mysql_proxy$", UpgradeMySQLProxySceneApiView.as_view()), url(r"^scene/upgrade_mysql$", UpgradeMySQLSceneApiView.as_view()), url(r"^scene/migrate_upgrade_mysql$", MigrateUpgradeMySQLSceneApiView.as_view()), + url(r"^scene/non_stanby_slave_upgrade_mysql$", NonStanbySlavesUpgradeMySQLSceneApiView.as_view()), # mysql url(r"^scene/dbconsole_dump$", DbConsoleDumpApiView.as_view()), url(r"^scene/install_mysql_apply$", InstallMySQLSingleSceneApiView.as_view()), diff --git a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py index 24fa33ce10..d97bb57222 100644 --- a/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py +++ b/dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py @@ -126,47 +126,73 @@ def get_sys_init_payload(self, **kwargs) -> dict: "payload": {"user": self.account["os_mysql_user"], "pwd": self.account["os_mysql_pwd"]}, } - def deal_mysql_config(self, db_version: str, origin_configs: dict, init_configs: dict) -> dict: + def deal_with_upgrade_to_mysql57(self, cfg: dict) -> dict: + del_keys = [ + "secure_auth", + "innodb_additional_mem_pool_size", + "innodb_create_use_gcs_real_format", + "thread_concurrency", + "storage_engine", + "old_passwords", + "innodb_file_io_threads", + ] + for key in del_keys: + if key in cfg["mysqld"]: + del cfg["mysqld"][key] + + # 需要rename key 的配置 + if "thread_cache" in cfg["mysqld"]: + cfg["mysqld"]["thread_cache_size"] = cfg["mysqld"]["thread_cache"] + del cfg["mysqld"]["thread_cache"] + if "key_buffer" in cfg["mysqld"]: + cfg["mysqld"]["key_buffer_size"] = cfg["mysqld"]["key_buffer"] + del cfg["mysqld"]["key_buffer"] + if "log_warnings" in cfg["log_error_verbosity"]: + cfg["mysqld"]["log_error_verbosity"] = "1" + del cfg["mysqld"]["log_warnings"] + # 需要更改val的配置 + cfg["mysqld"]["show_compatibility_56"] = "ON" + cfg["mysqld"]["secure_file_priv"] = "" + cfg["mysqld"]["sync_binlog"] = 0 + return cfg + + def deal_with_upgrade_to_mysql80(self, is_community: bool, cfg: dict) -> dict: """ 处理不同介质的之间的mysql配置 """ - cfg = copy.deepcopy(init_configs) - cfg["mysqld"].update(origin_configs) - - if db_version >= "8.0.0": - if "log_warnings" in cfg["mysqld"]: - value = cfg["mysqld"]["log_warnings"] - if value == "0": - cfg["mysqld"]["log_error_verbosity"] = "1" - elif value == "1": - cfg["mysqld"]["log_error_verbosity"] = "2" - else: - cfg["mysqld"]["log_error_verbosity"] = "3" - del cfg["mysqld"]["log_warnings"] - # mysql8.0 无法识别这些参数 - for key in [ - "innodb_file_format", - "query_cache_size", - "query_cache_type", - "show_compatibility_56", - "query_response_time_stats", - "userstat", - ]: - if key in cfg["mysqld"]: - del cfg["mysqld"][key] - if "thread_handling" in cfg["mysqld"]: - val = cfg["mysqld"]["thread_handling"] - # thread_handling = 2 是tmysql参数。社区版本和txsql 都不能识别 - if val == "1": - cfg["mysqld"]["thread_handling"] = "no-threads" - elif val == "2": - cfg["mysqld"]["thread_handling"] = "one-thread-per-connection" - elif val == "3": - cfg["mysqld"]["thread_handling"] = "loaded-dynamically" + if "log_warnings" in cfg["mysqld"]: + value = cfg["mysqld"]["log_warnings"] + if value == "0": + cfg["mysqld"]["log_error_verbosity"] = "1" + elif value == "1": + cfg["mysqld"]["log_error_verbosity"] = "2" + else: + cfg["mysqld"]["log_error_verbosity"] = "3" + del cfg["mysqld"]["log_warnings"] + # mysql8.0 无法识别这些参数 + for key in [ + "innodb_file_format", + "query_cache_size", + "query_cache_type", + "show_compatibility_56", + "query_response_time_stats", + "userstat", + ]: + if key in cfg["mysqld"]: + del cfg["mysqld"][key] + if "thread_handling" in cfg["mysqld"]: + val = cfg["mysqld"]["thread_handling"] + # thread_handling = 2 是tmysql参数。社区版本和txsql 都不能识别 + if val == "1": + cfg["mysqld"]["thread_handling"] = "no-threads" + elif val == "2": + cfg["mysqld"]["thread_handling"] = "one-thread-per-connection" + elif val == "3": + cfg["mysqld"]["thread_handling"] = "loaded-dynamically" # 这里应该是社区版本等非Tendb数据库的版本需要处理的参数 # 介质管理暂未记录介质来源属性 - if db_version >= "8.0.30": + if is_community: for key in [ "log_bin_compress", "relay_log_uncompress", @@ -182,6 +208,24 @@ def deal_mysql_config(self, db_version: str, origin_configs: dict, init_configs: del cfg["mysqld"][key] return cfg + def deal_mysql_config(self, db_version: str, origin_configs: dict, init_configs: dict) -> dict: + """ + 处理不同介质的之间的mysql配置 + """ + cfg = copy.deepcopy(init_configs) + cfg["mysqld"].update(origin_configs) + if db_version >= "5.7.0": + cfg = self.deal_with_upgrade_to_mysql57(cfg) + if db_version >= "8.0.0": + # 这里应该是社区版本等非Tendb数据库的版本需要处理的参数 + # 介质管理暂未记录介质来源属性 + is_community = False + if db_version >= "8.0.30": + is_community = True + cfg = self.deal_with_upgrade_to_mysql80(is_community=is_community, cfg=cfg) + + return cfg + def get_install_mysql_payload(self, **kwargs) -> dict: """ 拼接安装MySQL的payload参数, 分别兼容集群申请、集群实例重建、集群实例添加单据的获取方式 @@ -214,10 +258,11 @@ def get_install_mysql_payload(self, **kwargs) -> dict: for port in install_mysql_ports: mysql_config[port] = copy.deepcopy(init_mysql_config[port]) port_str = str(port) - if port_str in old_configs.keys(): - mysql_config[port] = self.deal_mysql_config( - db_version=version_no, init_configs=mysql_config[port], origin_configs=old_configs[port_str] - ) + if self.cluster["is_upgrade"]: + if port_str in old_configs.keys(): + mysql_config[port] = self.deal_mysql_config( + db_version=version_no, init_configs=mysql_config[port], origin_configs=old_configs[port_str] + ) logger.debug("install config:", mysql_config) drs_account, dbha_account = self.get_super_account() diff --git a/dbm-ui/backend/flow/views/mysql_upgrade.py b/dbm-ui/backend/flow/views/mysql_upgrade.py index 9a830b7acf..aabec863a3 100644 --- a/dbm-ui/backend/flow/views/mysql_upgrade.py +++ b/dbm-ui/backend/flow/views/mysql_upgrade.py @@ -49,3 +49,18 @@ def post(request): test = MySQLController(root_id=root_id, ticket_data=request.data) test.mysql_migrate_upgrade_scene() return Response({"root_id": root_id}) + + +class NonStanbySlavesUpgradeMySQLSceneApiView(FlowTestView): + """ + api: /apis/v1/flow/scene/migrate_upgrade_mysql + """ + + @staticmethod + def post(request): + logger.info(_("开始测试迁移升级非standy mysql实例场景")) + root_id = generate_root_id() + logger.info("define root_id: {}".format(root_id)) + test = MySQLController(root_id=root_id, ticket_data=request.data) + test.non_standby_slaves_upgrade_scene() + return Response({"root_id": root_id})