Skip to content

Commit

Permalink
fix: 故障后迁移standby从库域名的获取 TencentBlueKing#8608
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Dec 17, 2024
1 parent b8ef874 commit a986cf1
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 49 deletions.
4 changes: 2 additions & 2 deletions dbm-ui/backend/configuration/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
MYSQL_ADMIN_USER = "ADMIN"
# sqlserver的用户登录admin账号名称
SQLSERVER_ADMIN_USER = "dbm_admin"
# TODO: job超时时间最大为86400,后续考虑让job平台调大限制
MYSQL_DATA_RESTORE_TIME = 259200
# TODO: job超时时间最大为 86400 not 259200 后续考虑让job平台调大限制
MYSQL_DATA_RESTORE_TIME = 86400
MYSQL_USUAL_JOB_TIME = 7200
MYSQL8_VER_PARSE_NUM = 8000000

Expand Down
1 change: 1 addition & 0 deletions dbm-ui/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# IP 端口分隔符
IP_PORT_DIVIDER = ":"
IP_PORT_DIVIDER_FOR_DNS = "#"

# IP 捕获正则表达式
IP_RE_PATTERN = r"(?:(?:2(?:5[0-5]|[0-4]\d))|[0-1]?\d{1,2})(?:\.(?:(?:2(?:5[0-5]|[0-4]\d))|[0-1]?\d{1,2})){3}"
Expand Down
18 changes: 11 additions & 7 deletions dbm-ui/backend/db_meta/api/cluster/tendbha/switch_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from django.db import transaction

from backend.db_meta.enums import InstancePhase, InstanceStatus
from backend.db_meta.enums import InstanceInnerRole, InstancePhase, InstanceStatus
from backend.db_meta.models import Cluster, StorageInstance

logger = logging.getLogger("root")
Expand All @@ -34,6 +34,16 @@ def switch_slave(cluster_id: int, target_slave_ip: str, source_slave_ip: str, sl
source_storage_obj = StorageInstance.objects.get(
machine__ip=source_slave_ip, port=cluster_storage_port, machine__bk_cloud_id=cluster.bk_cloud_id
)
source_master_obj = cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
# 移除关系
cluster.storageinstance_set.remove(source_storage_obj)
cluster_entry_list = cluster.clusterentry_set.filter(entry__in=slave_domain)
for cluster_entry in cluster_entry_list:
# 可能由于切换,域名指向了主节点的。这里要清除
if source_storage_obj.is_stand_by:
cluster_entry.storageinstance_set.remove(source_master_obj)
cluster_entry.storageinstance_set.remove(source_storage_obj)
cluster_entry.storageinstance_set.add(target_storage_obj)
# target实例需要继承source实例的is_standby特性
target_storage_obj.is_stand_by = source_storage_obj.is_stand_by
target_storage_obj.status = InstanceStatus.RUNNING.value
Expand All @@ -43,12 +53,6 @@ def switch_slave(cluster_id: int, target_slave_ip: str, source_slave_ip: str, sl
source_storage_obj.phase = InstancePhase.OFFLINE.value
source_storage_obj.is_stand_by = False
source_storage_obj.save()
# 移除关系
cluster.storageinstance_set.remove(source_storage_obj)
cluster_entry_list = cluster.clusterentry_set.filter(entry__in=slave_domain)
for cluster_entry in cluster_entry_list:
cluster_entry.storageinstance_set.remove(source_storage_obj)
cluster_entry.storageinstance_set.add(target_storage_obj)


@transaction.atomic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from typing import Dict

from backend.constants import IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import ClusterEntryRole, ClusterEntryType, InstanceInnerRole
from backend.db_meta.models import Cluster

Expand All @@ -26,29 +27,63 @@ def get_tendb_ha_entry(cluster_id: int) -> Dict:
master = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
standby_ins = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True)
slave_ins = cls.storageinstance_set.filter(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=False)
# 主域名
entry_map["master_domain"] = cls.immute_domain
# entry_map[master.machine.ip] = cls.immute_domain

standby_ins_dns = standby_ins.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
if len(standby_ins_dns) == 0:
standby_ins_dns = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
# master 上存在从域名
master_slave_domain_dns = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
entry_map["master_has_slave_domain"] = [one.entry for one in master_slave_domain_dns]

# standby 域名,从本身节点获取,如果获取不到,则从主节点获取。
if len(standby_ins_dns) > 0:
entry_map["slave_domain"] = standby_ins_dns[0].entry
elif len(master_slave_domain_dns) > 0:
entry_map["slave_domain"] = master_slave_domain_dns[0].entry
else:
entry_map["slave_domain"] = ""

# standby上的从域名需要加上主节点存在的从域名,防止ha切换后遗留在主节点上的域名。
entry_map[standby_ins.machine.ip] = [one.entry for one in standby_ins_dns]
entry_map[standby_ins.machine.ip].extend([one.entry for one in master_slave_domain_dns])

# 其他slave域名 entry_map["ip"]=域名
for slave in slave_ins:
if slave.machine.ip not in entry_map:
entry_map[slave.machine.ip] = []
slave_dns = slave.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value)
slave_end_list = [slave_end.entry for slave_end in slave_dns]
entry_map[slave.machine.ip].extend(slave_end_list)
return entry_map

if standby_ins.machine.ip not in entry_map:
entry_map[standby_ins.machine.ip] = [one.entry for one in standby_ins_dns]
else:
entry_map[standby_ins.machine.ip].extend([one.entry for one in standby_ins_dns])

def get_standby_dns(cluster_id: int):
"""
获取tendb ha 集群相关的所有域名。
@param cluster_id: tendb ha 集群id
@return: dns map
"""
cls = Cluster.objects.get(id=cluster_id)
entry_map = {}
master = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
standby_ins = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True)

standby_ins_dns_from_master = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
standby_ins_dns_from_standby = standby_ins.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
for dns_master in standby_ins_dns_from_master:
entry_map["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master.port)] = dns_master.entry

for dns_standby in standby_ins_dns_from_standby:
entry_map[
"{}{}{}".format(standby_ins.machine.ip, IP_PORT_DIVIDER_FOR_DNS, standby_ins.port)
] = dns_standby.entry

return entry_map
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterEntryType, InstanceInnerRole
from backend.db_meta.enums import ClusterEntryRole, ClusterEntryType, InstanceInnerRole
from backend.db_meta.models import Cluster
from backend.db_meta.models.extra_process import ExtraProcessInstance
from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER, InstanceStatus
Expand Down Expand Up @@ -205,9 +205,18 @@ def master_and_slave_switch(
}
]
old_slave = cluster.storageinstance_set.get(machine__ip=cluster_info["old_slave_ip"])
slave_dns_list = old_slave.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all()
slave_dns_list = old_slave.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
).all()
cluster_info["slave_dns_list"] = [i.entry for i in slave_dns_list]
# todo 域名映射应该映射老ip对应的所有域名
if old_slave.is_stand_by:
master_storage = cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
mater_has_slave_dns_list = master_storage.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
).all()
cluster_info["slave_dns_list"].extend([i.entry for i in mater_has_slave_dns_list])
cluster_info["slave_dns_list"] = list(set(cluster_info["slave_dns_list"]))

for slave_domain in cluster_info["slave_dns_list"]:
acts_list.append(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.constants import IP_PORT_DIVIDER, IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import InstanceStatus
from backend.db_meta.models import Cluster
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
Expand All @@ -27,6 +27,7 @@
DownloadMediaKwargs,
ExecuteRdsKwargs,
InstanceUserCloneKwargs,
IpDnsRecordRecycleKwargs,
RecycleDnsRecordKwargs,
)

Expand All @@ -45,7 +46,10 @@ def slave_migrate_switch_sub_flow(
""""""
# 默认预检测连接情况、同步延时、checksum校验结果
master = cluster.main_storage_instances()[0]
old_slave = "{}{}{}".format(old_slave_ip, IP_PORT_DIVIDER, master.port)
old_slave_storage = cluster.storageinstance_set.get(
machine__ip=old_slave_ip, port=master.port, machine__bk_cloud_id=cluster.bk_cloud_id
)
old_slave = "{}{}{}".format(old_slave_ip, IP_PORT_DIVIDER, old_slave_storage.port)
new_slave = "{}{}{}".format(new_slave_ip, IP_PORT_DIVIDER, master.port)
old_master = "{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)

Expand Down Expand Up @@ -102,10 +106,7 @@ def slave_migrate_switch_sub_flow(
"bk_cloud_id": cluster.bk_cloud_id,
}
]
slave_storage = cluster.storageinstance_set.filter(
status=InstanceStatus.RUNNING.value, machine__ip=old_slave_ip
).exists()
if slave_storage:
if old_slave_storage.status == InstanceStatus.RUNNING.value:
clone_data.append(
{
"source": old_slave,
Expand Down Expand Up @@ -137,6 +138,23 @@ def slave_migrate_switch_sub_flow(
),
}
)

# 以上已经添加了域名,如果替换的从库是standby,需要删除可能因为切换导致standby域名指向了主库的可能。
if old_slave_storage.is_stand_by:
for domain in domain_map["master_has_slave_domain"]:
domain_add_list.append(
{
"act_name": _("删除master上的从域名{}:{} {}").format(master.machine.ip, master.port, domain),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
IpDnsRecordRecycleKwargs(
bk_cloud_id=cluster.bk_cloud_id,
instance_list=["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master.port)],
domain_name=domain,
)
),
}
)
if len(domain_add_list) > 0:
sub_pipeline.add_parallel_acts(acts_list=domain_add_list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.constants import IP_PORT_DIVIDER, IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import ClusterType, InstanceInnerRole, InstancePhase, InstanceStatus
from backend.db_meta.models import Cluster
from backend.db_package.models import Package
from backend.flow.consts import MediumEnum
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_tendb_ha_entry
from backend.flow.engine.bamboo.scene.mysql.common.cluster_entrys import get_standby_dns, get_tendb_ha_entry
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import (
build_surrounding_apps_sub_flow,
install_mysql_in_cluster_sub_flow,
Expand Down Expand Up @@ -588,24 +588,46 @@ def restore_local_slave_flow(self):
kwargs=asdict(InstanceUserCloneKwargs(clone_data=clone_data)),
)

# 添加域名
domain_map = get_tendb_ha_entry(cluster_model.id)
domain_add_list = []
for domain in domain_map[target_slave.machine.ip]:
domain_add_list.append(
{
"act_name": _("添加从库域名{}:{}").format(target_slave.machine.ip, domain),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
UpdateDnsRecordKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
old_instance="{}#{}".format(target_slave.machine.ip, master.port),
new_instance="{}#{}".format(target_slave.machine.ip, master.port),
update_domain_name=domain,
)
),
}
)
# 这里区分是standby还是普通salve添加域名
if target_slave.is_stand_by:
domain_map = get_standby_dns(cluster_model.id)
domain_add_list = []
for old_instance, domain in domain_map.items():
domain_add_list.append(
{
"act_name": _("添加Standby从库域名{}:{}").format(target_slave.machine.ip, domain),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
UpdateDnsRecordKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
old_instance=old_instance,
new_instance="{}{}{}".format(
target_slave.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master.port
),
update_domain_name=domain,
)
),
}
)
else:
# 非standby节点则刷新域名
domain_map = get_tendb_ha_entry(cluster_model.id)
domain_add_list = []
for domain in domain_map[target_slave.machine.ip]:
domain_add_list.append(
{
"act_name": _("添加从库域名{}:{}").format(target_slave.machine.ip, domain),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
UpdateDnsRecordKwargs(
bk_cloud_id=cluster_model.bk_cloud_id,
old_instance="{}#{}".format(target_slave.machine.ip, master.port),
new_instance="{}#{}".format(target_slave.machine.ip, master.port),
update_domain_name=domain,
)
),
}
)

if len(domain_add_list) > 0:
tendb_migrate_pipeline.add_parallel_acts(acts_list=domain_add_list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ def rollback_to_cluster_flow(self):
sub_pipeline_list = []
for info in self.ticket_data["infos"]:
self.data = copy.deepcopy(info)
# 判断是否全库回档,默认是全库,全库包括逻辑备份,物理备份. todo 如果指定部分库。则只能使用逻辑备份。
self.data["all_database_rollback"] = True
if not (
self.data["databases"] == "*" and self.data["tables"] == "*" and self.data["databases_ignore"] == ""
):
self.data["all_database_rollback"] = False
cluster_class = Cluster.objects.get(id=self.data["cluster_id"])
filters = Q(
cluster__cluster_type=ClusterType.TenDBSingle.value, instance_inner_role=InstanceInnerRole.ORPHAN.value
Expand Down Expand Up @@ -375,7 +381,7 @@ def rollback_to_cluster_flow(self):
)
),
)
if rollback_storage.instance_role in (InstanceRole.BACKEND_SLAVE, InstanceRole.BACKEND_REPEATER):
if self.data["all_database_rollback"]:
rollback_pipeline.add_act(
act_name=_("从库stop slave {}").format(rollback_storage.ip_port),
act_component_code=MySQLExecuteRdsComponent.code,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def tendb_rollback_data(self):
tendb_rollback_list = []
for info in self.ticket_data["infos"]:
self.data = info
# 判断是否全库回档,默认是全库,全库包括逻辑备份,物理备份. todo 如果指定部分库。则只能使用逻辑备份。
self.data["all_database_rollback"] = True
if not (
self.data["databases"] == "*" and self.data["tables"] == "*" and self.data["databases_ignore"] == ""
):
self.data["all_database_rollback"] = False
source_cluster = Cluster.objects.get(id=self.data["source_cluster_id"])
target_cluster = Cluster.objects.get(id=self.data["target_cluster_id"])
self.data["uid"] = self.ticket_data["uid"]
Expand Down Expand Up @@ -148,6 +154,7 @@ def tendb_rollback_data(self):
"databases_ignore": self.data["databases_ignore"],
"tables_ignore": self.data["tables_ignore"],
"change_master": False,
"all_database_rollback": self.data["all_database_rollback"],
}
spd_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
spd_sub_pipeline.add_act(
Expand Down Expand Up @@ -265,6 +272,7 @@ def tendb_rollback_data(self):
"databases_ignore": self.data["databases_ignore"],
"tables_ignore": self.data["tables_ignore"],
"change_master": False,
"all_database_rollback": self.data["all_database_rollback"],
}

ins_sub_pipeline = SubBuilder(root_id=self.root_id, data=copy.deepcopy(self.data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def remote_node_rollback(root_id: str, ticket_data: dict, cluster: dict):
sub_pipeline_all_list = []
instance_check_list = []
backup_info = cluster["backupinfo"]
if cluster["new_master"]["instance"] != cluster["new_slave"]["instance"]:
if cluster["new_master"]["instance"] != cluster["new_slave"]["instance"] and cluster["all_database_rollback"]:
sub_pipeline_all.add_act(
act_name=_("从库stop slave {}").format(cluster["new_slave"]["instance"]),
act_component_code=MySQLExecuteRdsComponent.code,
Expand Down

0 comments on commit a986cf1

Please sign in to comment.