Skip to content

Commit

Permalink
feat: 定点回档到指定集群 #5071
Browse files Browse the repository at this point in the history
feat: 定点回档到指定集群 #5071

feat: 定点回档到指定集群 #5071
  • Loading branch information
zfrendo authored and jinquantianxia committed Jun 27, 2024
1 parent 52e71e7 commit f40f39c
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,31 @@ def get_local_backup(instances: list, cluster: Cluster, end_time: str = None):
return max_backup
else:
return None


def check_storage_database(cluster: Cluster, ip: str, port: int) -> bool:
"""
检查数据库是否为空实例
@param ip: 实例ip
@param port: 实例端口
@param cluster: 集群
@return:
"""
query_cmds = """select SCHEMA_NAME from information_schema.schemata where SCHEMA_NAME not in
('information_schema','db_infobase','infodba_schema','mysql','test','sys','performance_schema')"""
res = DRSApi.rpc(
{
"addresses": ["{}{}{}".format(ip, IP_PORT_DIVIDER, port)],
"cmds": [query_cmds],
"force": False,
"bk_cloud_id": cluster.bk_cloud_id,
}
)
if res[0]["error_msg"]:
logging.error("get databases error {}".format(res[0]["error_msg"]))
return False
if isinstance(res[0]["cmd_results"][0]["table_data"], list) and len(res[0]["cmd_results"][0]["table_data"]) == 0:
logging.info(res[0]["cmd_results"])
return True
else:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ def mysql_rollback_data_sub_flow(
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
return sub_pipeline.build_sub_process(sub_name=_("用本地备份恢复数据{}".format(exec_act_kwargs.exec_ip)))
return sub_pipeline.build_sub_process(
sub_name=_("用本地备份恢复数据{}:{}".format(exec_act_kwargs.exec_ip, cluster["rollback_port"]))
)


def mysql_restore_master_slave_sub_flow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,6 @@ def slave_recover_sub_flow(root_id: str, ticket_data: dict, cluster_info: dict):
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
return sub_pipeline.build_sub_process(sub_name=_("RemoteDB从节点重建子流程{}".format(exec_act_kwargs.exec_ip)))
return sub_pipeline.build_sub_process(
sub_name=_("{}:{}从节点重建".format(cluster["new_slave_ip"], cluster["new_slave_port"]))
)
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@ def deploy_restore_local_slave_flow(self):
)
)

# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})
# 克隆权限
new_slave = "{}{}{}".format(target_slave.machine.ip, IP_PORT_DIVIDER, master.port)
old_master = "{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)
Expand Down Expand Up @@ -541,6 +543,18 @@ def deploy_restore_local_slave_flow(self):
),
)

tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_model.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[target_slave.machine.ip],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBHA.value,
)
)

tendb_migrate_pipeline_list.append(
tendb_migrate_pipeline.build_sub_process(_("slave原地重建{}").format(target_slave.machine.ip))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,8 @@ def restore_local_slave_flow(self):
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=cluster
)
)

# 卸载流程人工确认
tendb_migrate_pipeline.add_act(act_name=_("人工确认"), act_component_code=PauseComponent.code, kwargs={})
# 克隆权限
new_slave = "{}{}{}".format(target_slave.machine.ip, IP_PORT_DIVIDER, master.port)
old_master = "{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)
Expand Down Expand Up @@ -497,8 +498,20 @@ def restore_local_slave_flow(self):
),
)

tendb_migrate_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=cluster_model.bk_cloud_id,
master_ip_list=None,
slave_ip_list=[target_slave.machine.ip],
root_id=self.root_id,
parent_global_data=copy.deepcopy(self.data),
is_init=True,
cluster_type=ClusterType.TenDBHA.value,
)
)

tendb_migrate_pipeline_list.append(
tendb_migrate_pipeline.build_sub_process(_("slave原地重建{}").format(target_slave.machine.ip))
tendb_migrate_pipeline.build_sub_process(_("{}slave原地重建").format(target_slave.ip_port))
)

tendb_migrate_pipeline_all.add_parallel_sub_pipeline(sub_flow_list=tendb_migrate_pipeline_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,30 @@
from backend.db_package.models import Package
from backend.flow.consts import InstanceStatus, MediumEnum, RollbackType
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.exceptions import NormalTenDBFlowException
from backend.flow.engine.bamboo.scene.mysql.common.get_local_backup import check_storage_database
from backend.flow.engine.bamboo.scene.mysql.common.mysql_resotre_data_sub_flow import mysql_rollback_data_sub_flow
from backend.flow.engine.bamboo.scene.mysql.mysql_rollback_data_sub_flow import (
rollback_local_and_backupid,
rollback_remote_and_backupid,
rollback_remote_and_time,
)
from backend.flow.engine.bamboo.scene.mysql.mysql_single_apply_flow import MySQLSingleApplyFlow
from backend.flow.engine.bamboo.scene.spider.common.exceptions import NormalSpiderFlowException
from backend.flow.plugins.components.collections.common.pause import PauseComponent
from backend.flow.plugins.components.collections.mysql.clear_machine import MySQLClearMachineComponent
from backend.flow.plugins.components.collections.mysql.dns_manage import MySQLDnsManageComponent
from backend.flow.plugins.components.collections.mysql.exec_actuator_script import ExecuteDBActuatorScriptComponent
from backend.flow.plugins.components.collections.mysql.mysql_db_meta import MySQLDBMetaComponent
from backend.flow.plugins.components.collections.mysql.trans_flies import TransFileComponent
from backend.flow.utils.mysql.common.mysql_cluster_info import get_version_and_charset
from backend.flow.utils.mysql.mysql_act_dataclass import DBMetaOPKwargs, ExecActuatorKwargs, RecycleDnsRecordKwargs
from backend.flow.utils.mysql.mysql_act_dataclass import (
DBMetaOPKwargs,
DownloadMediaKwargs,
ExecActuatorKwargs,
RecycleDnsRecordKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
from backend.flow.utils.mysql.mysql_context_dataclass import ClusterInfoContext
from backend.flow.utils.mysql.mysql_db_meta import MySQLDBMeta
Expand Down Expand Up @@ -96,7 +105,6 @@ def rollback_data_flow(self):
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)
# todo 之前安装流程整改为调用统一的单节点安装流程
sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
install_ticket = copy.deepcopy(self.data)
datetime_str = datetime.strftime(datetime.now(), "%Y%m%d%H%M%S%f")
Expand Down Expand Up @@ -254,3 +262,159 @@ def rollback_data_flow(self):

mysql_restore_slave_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipeline_list)
mysql_restore_slave_pipeline.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True)

def rollback_to_cluster_flow(self):
"""
定义重建slave节点的流程
增加单据临时ADMIN账号的添加和删除逻辑
"""
cluster_ids = [i["cluster_id"] for i in self.ticket_data["infos"]]
mysql_restore_slave_pipeline = Builder(
root_id=self.root_id,
data=copy.deepcopy(self.ticket_data),
need_random_pass_cluster_ids=list(set(cluster_ids)),
)
# 下发actor
sub_pipeline_list = []
for info in self.ticket_data["infos"]:
self.data = copy.deepcopy(info)
cluster_class = Cluster.objects.get(id=self.data["cluster_id"])
master = cluster_class.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
self.data["bk_biz_id"] = cluster_class.bk_biz_id
self.data["bk_cloud_id"] = cluster_class.bk_cloud_id
self.data["db_module_id"] = cluster_class.db_module_id
self.data["module"] = cluster_class.db_module_id
self.data["time_zone"] = cluster_class.time_zone
self.data["created_by"] = self.ticket_data["created_by"]
self.data["ticket_type"] = self.ticket_data["ticket_type"]
self.data["cluster_type"] = cluster_class.cluster_type
self.data["uid"] = self.ticket_data["uid"]
self.data["city"] = cluster_class.region
self.data["force"] = info.get("force", False)
self.data["charset"], self.data["db_version"] = get_version_and_charset(
self.data["bk_biz_id"],
db_module_id=self.data["db_module_id"],
cluster_type=self.data["cluster_type"],
)

sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))

rollback_class = Cluster.objects.get(id=self.data["rollback_cluster_id"])
storages = rollback_class.storageinstance_set.all()
rollback_pipeline_list = []
for rollback_storage in storages:
if not check_storage_database(rollback_class, rollback_storage.machine.ip, rollback_storage.port):
logger.error("cluster {} check database fail".format(rollback_class.id))
raise NormalSpiderFlowException(
message=_("回档集群 {} 空闲检查不通过,请确认回档集群是否存在非系统数据库".format(rollback_class.id))
)
mycluster = {
"name": cluster_class.name,
"cluster_id": cluster_class.id,
"cluster_type": cluster_class.cluster_type,
"bk_biz_id": cluster_class.bk_biz_id,
"bk_cloud_id": cluster_class.bk_cloud_id,
"db_module_id": cluster_class.db_module_id,
"databases": self.data["databases"],
"tables": self.data["tables"],
"databases_ignore": self.data["databases_ignore"],
"tables_ignore": self.data["tables_ignore"],
"charset": self.data["charset"],
"change_master": False,
"file_target_path": "/data/dbbak/{}/{}".format(self.root_id, rollback_storage.port),
"skip_local_exists": True,
"name_regex": "^.+{}\\.\\d+(\\..*)*$".format(master.port),
"rollback_time": self.data["rollback_time"],
"backupinfo": self.data["backupinfo"],
"rollback_type": self.data["rollback_type"],
"rollback_ip": rollback_storage.machine.ip,
"rollback_port": rollback_storage.port,
"backend_port": rollback_storage.port,
"master_port": master.port,
"master_ip": master.machine.ip,
}
exec_act_kwargs = ExecActuatorKwargs(
bk_cloud_id=cluster_class.bk_cloud_id,
cluster_type=ClusterType.TenDBHA,
cluster=mycluster,
)

exec_act_kwargs.get_mysql_payload_func = MysqlActPayload.mysql_mkdir_dir.__name__
exec_act_kwargs.exec_ip = rollback_storage.machine.ip
rollback_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
rollback_pipeline.add_act(
act_name=_("下发db-actor到节点"),
act_component_code=TransFileComponent.code,
kwargs=asdict(
DownloadMediaKwargs(
bk_cloud_id=rollback_storage.bk_cloud_id,
exec_ip=[rollback_storage.machine.ip],
file_list=GetFileList(db_type=DBType.MySQL).get_db_actuator_package(),
)
),
)
rollback_pipeline.add_act(
act_name=_("创建目录 {}".format(mycluster["file_target_path"])),
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)

# 本地备份+时间
if self.data["rollback_type"] == RollbackType.LOCAL_AND_TIME:
inst_list = ["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)]
stand_by_slaves = cluster_class.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value,
is_stand_by=True,
status=InstanceStatus.RUNNING.value,
).exclude(machine__ip__in=[rollback_storage.machine.ip])
if len(stand_by_slaves) > 0:
inst_list.append(
"{}{}{}".format(stand_by_slaves[0].machine.ip, IP_PORT_DIVIDER, stand_by_slaves[0].port)
)
rollback_pipeline.add_sub_pipeline(
sub_flow=mysql_rollback_data_sub_flow(
root_id=self.root_id,
ticket_data=copy.deepcopy(self.data),
cluster=mycluster,
cluster_model=cluster_class,
ins_list=inst_list,
is_rollback_binlog=True,
)
)

# 远程备份+时间
elif self.data["rollback_type"] == RollbackType.REMOTE_AND_TIME.value:
rollback_pipeline.add_sub_pipeline(
sub_flow=rollback_remote_and_time(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=mycluster
)
)
# 远程备份+备份ID
elif self.data["rollback_type"] == RollbackType.REMOTE_AND_BACKUPID.value:
rollback_pipeline.add_sub_pipeline(
sub_flow=rollback_remote_and_backupid(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=mycluster
)
)

# 本地备份+备份ID
elif self.data["rollback_type"] == RollbackType.LOCAL_AND_BACKUPID:
rollback_pipeline.add_sub_pipeline(
sub_flow=rollback_local_and_backupid(
root_id=self.root_id, ticket_data=copy.deepcopy(self.data), cluster_info=mycluster
)
)
else:
raise NormalTenDBFlowException(message=_("rollback_type不存在"))

rollback_pipeline_list.append(
rollback_pipeline.build_sub_process(
sub_name=_("定点回档到{}:{}".format(rollback_storage.machine.ip, rollback_storage.port))
)
)
sub_pipeline.add_parallel_sub_pipeline(sub_flow_list=rollback_pipeline_list)
sub_pipeline_list.append(
sub_pipeline.build_sub_process(sub_name=_("定点回档到{}".format(rollback_class.immute_domain)))
)
mysql_restore_slave_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipeline_list)
mysql_restore_slave_pipeline.run_pipeline(init_trans_data_class=ClusterInfoContext(), is_drop_random_user=True)
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ def rollback_remote_and_time(root_id: str, ticket_data: dict, cluster_info: dict
act_component_code=ExecuteDBActuatorScriptComponent.code,
kwargs=asdict(exec_act_kwargs),
)
return sub_pipeline.build_sub_process(sub_name=_("{}定点回滚数据 REMOTE_AND_TIME ".format(cluster_info["rollback_ip"])))
return sub_pipeline.build_sub_process(
sub_name=_("{}:{}定点回滚数据 REMOTE_AND_TIME ".format(cluster_info["rollback_ip"], cluster_info["rollback_port"]))
)


def rollback_remote_and_backupid(root_id: str, ticket_data: dict, cluster_info: dict):
Expand Down Expand Up @@ -267,7 +269,9 @@ def rollback_remote_and_backupid(root_id: str, ticket_data: dict, cluster_info:
write_payload_var="change_master_info",
)
return sub_pipeline.build_sub_process(
sub_name=_("{}定点回滚数据 REMOTE_AND_BACKUPID ".format(cluster_info["rollback_ip"]))
sub_name=_(
"{}:{}定点回滚数据 REMOTE_AND_BACKUPID ".format(cluster_info["rollback_ip"], cluster_info["rollback_port"])
)
)


Expand Down Expand Up @@ -314,5 +318,7 @@ def rollback_local_and_backupid(root_id: str, ticket_data: dict, cluster_info: d
write_payload_var="change_master_info",
)
return sub_pipeline.build_sub_process(
sub_name=_("{}定点回滚数据 LOCAL_AND_BACKUPID ".format(cluster_info["rollback_ip"]))
sub_name=_(
"{}:{}定点回滚数据 LOCAL_AND_BACKUPID ".format(cluster_info["rollback_ip"], cluster_info["rollback_port"])
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ def migrate_master_slave_flow(self):
)
),
)
sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))
sync_data_sub_pipeline_list.append(
sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复分片{}数据".format(shard_id)))
)

# 阶段4 切换remotedb
switch_sub_pipeline_list = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,11 @@ def tendb_remote_slave_recover(self):
)
),
)
sync_data_sub_pipeline_list.append(sync_data_sub_pipeline.build_sub_process(sub_name=_("恢复实例数据")))
sync_data_sub_pipeline_list.append(
sync_data_sub_pipeline.build_sub_process(
sub_name=_("{}:{}恢复实例数据".format(node["new_slave"]["ip"], node["new_slave"]["port"]))
)
)
# 阶段4 切换
switch_sub_pipeline_list = []
# 切换后写入元数据
Expand Down
7 changes: 7 additions & 0 deletions dbm-ui/backend/flow/engine/controller/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ def mysql_rollback_data_cluster_scene(self):
flow = MySQLRollbackDataFlow(root_id=self.root_id, data=self.ticket_data)
flow.rollback_data_flow()

def mysql_rollback_to_cluster_scene(self):
"""
数据定点回档
"""
flow = MySQLRollbackDataFlow(root_id=self.root_id, data=self.ticket_data)
flow.rollback_to_cluster_flow()

def mysql_pt_table_sync_scene(self):
"""
mysql数据修复场景
Expand Down
Loading

0 comments on commit f40f39c

Please sign in to comment.