Skip to content

Commit

Permalink
fix: 故障后迁移standby从库域名的获取 #8608
Browse files Browse the repository at this point in the history
  • Loading branch information
zfrendo committed Dec 24, 2024
1 parent 0e7eff3 commit a385071
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 97 deletions.
1 change: 1 addition & 0 deletions dbm-ui/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# IP 端口分隔符
IP_PORT_DIVIDER = ":"
IP_PORT_DIVIDER_FOR_DNS = "#"

# IP 捕获正则表达式
IP_RE_PATTERN = r"(?:(?:2(?:5[0-5]|[0-4]\d))|[0-1]?\d{1,2})(?:\.(?:(?:2(?:5[0-5]|[0-4]\d))|[0-1]?\d{1,2})){3}"
Expand Down
27 changes: 16 additions & 11 deletions dbm-ui/backend/db_meta/api/cluster/tendbha/switch_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from django.db import transaction

from backend.db_meta.enums import InstancePhase, InstanceStatus
from backend.db_meta.enums import InstanceInnerRole, InstancePhase, InstanceStatus
from backend.db_meta.models import Cluster, StorageInstance

logger = logging.getLogger("root")
Expand All @@ -34,21 +34,26 @@ def switch_slave(cluster_id: int, target_slave_ip: str, source_slave_ip: str, sl
source_storage_obj = StorageInstance.objects.get(
machine__ip=source_slave_ip, port=cluster_storage_port, machine__bk_cloud_id=cluster.bk_cloud_id
)
source_master_obj = cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
cluster_entry_list = cluster.clusterentry_set.filter(entry__in=slave_domain)
for cluster_entry in cluster_entry_list:
# 可能由于切换,域名指向了主节点的。这里要清除
if source_storage_obj.is_stand_by is True:
cluster_entry.storageinstance_set.remove(source_master_obj)
cluster_entry.storageinstance_set.remove(source_storage_obj)
cluster_entry.storageinstance_set.add(target_storage_obj)
# target实例需要继承source实例的is_standby特性
target_storage_obj.is_stand_by = source_storage_obj.is_stand_by
target_storage_obj.status = InstanceStatus.RUNNING.value
target_storage_obj.phase = InstancePhase.ONLINE.value
target_storage_obj.save()
source_storage_obj.status = InstanceStatus.UNAVAILABLE.value
source_storage_obj.phase = InstancePhase.OFFLINE.value
source_storage_obj.is_stand_by = False
source_storage_obj.save()
# 移除关系
cluster.storageinstance_set.remove(source_storage_obj)
cluster_entry_list = cluster.clusterentry_set.filter(entry__in=slave_domain)
for cluster_entry in cluster_entry_list:
cluster_entry.storageinstance_set.remove(source_storage_obj)
cluster_entry.storageinstance_set.add(target_storage_obj)
# 移除关系。如果相等说明是原地重建,以上内容只为修正域名映射关系。
if source_storage_obj.ip_port != target_storage_obj.ip_port:
cluster.storageinstance_set.remove(source_storage_obj)
source_storage_obj.status = InstanceStatus.UNAVAILABLE.value
source_storage_obj.phase = InstancePhase.OFFLINE.value
source_storage_obj.is_stand_by = False
source_storage_obj.save()


@transaction.atomic
Expand Down
17 changes: 16 additions & 1 deletion dbm-ui/backend/db_meta/api/cluster/tendbha/switch_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@

from django.db import transaction

from backend.db_meta.enums import InstancePhase, InstanceRoleInstanceInnerRoleMap, InstanceStatus
from backend.db_meta.enums import (
ClusterEntryRole,
ClusterEntryType,
InstanceInnerRole,
InstancePhase,
InstanceRoleInstanceInnerRoleMap,
InstanceStatus,
)
from backend.db_meta.models import Cluster, StorageInstance
from backend.flow.utils.mysql.mysql_module_operate import MysqlCCTopoOperator

Expand Down Expand Up @@ -71,3 +78,11 @@ def change_storage_cluster_entry(cluster_id: int, slave_ip: str, new_slave_ip: s
for be in slave_storage.bind_entry.all():
be.storageinstance_set.remove(slave_storage)
be.storageinstance_set.add(new_slave_storage)
# 如果是standby节点,为了防止主节点故障dbHa切换后。从域名实际上指向的是主节点。需要从主节点读取域名并移除和添加
if slave_storage.is_stand_by is True:
master_storage = cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
for be in master_storage.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
):
be.storageinstance_set.remove(master_storage)
be.storageinstance_set.add(new_slave_storage)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from typing import Dict

from backend.constants import IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import ClusterEntryRole, ClusterEntryType, InstanceInnerRole
from backend.db_meta.models import Cluster

Expand All @@ -26,29 +27,63 @@ def get_tendb_ha_entry(cluster_id: int) -> Dict:
master = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
standby_ins = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True)
slave_ins = cls.storageinstance_set.filter(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=False)
# 主域名
entry_map["master_domain"] = cls.immute_domain
# entry_map[master.machine.ip] = cls.immute_domain

standby_ins_dns = standby_ins.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
if len(standby_ins_dns) == 0:
standby_ins_dns = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
# master 上存在从域名
master_slave_domain_dns = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
entry_map["master_has_slave_domain"] = [one.entry for one in master_slave_domain_dns]

# standby 域名,从本身节点获取,如果获取不到,则从主节点获取。
if len(standby_ins_dns) > 0:
entry_map["slave_domain"] = standby_ins_dns[0].entry
elif len(master_slave_domain_dns) > 0:
entry_map["slave_domain"] = master_slave_domain_dns[0].entry
else:
entry_map["slave_domain"] = ""

# standby上的从域名需要加上主节点存在的从域名,防止ha切换后遗留在主节点上的域名。
entry_map[standby_ins.machine.ip] = [one.entry for one in standby_ins_dns]
entry_map[standby_ins.machine.ip].extend([one.entry for one in master_slave_domain_dns])

# 其他slave域名 entry_map["ip"]=域名
for slave in slave_ins:
if slave.machine.ip not in entry_map:
entry_map[slave.machine.ip] = []
slave_dns = slave.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value)
slave_end_list = [slave_end.entry for slave_end in slave_dns]
entry_map[slave.machine.ip].extend(slave_end_list)
return entry_map

if standby_ins.machine.ip not in entry_map:
entry_map[standby_ins.machine.ip] = [one.entry for one in standby_ins_dns]
else:
entry_map[standby_ins.machine.ip].extend([one.entry for one in standby_ins_dns])

def get_standby_dns(cluster_id: int):
"""
获取tendb ha 集群相关的所有域名。
@param cluster_id: tendb ha 集群id
@return: dns map
"""
cls = Cluster.objects.get(id=cluster_id)
entry_map = {}
master = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
standby_ins = cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True)

standby_ins_dns_from_master = master.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
standby_ins_dns_from_standby = standby_ins.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
)
for dns_master in standby_ins_dns_from_master:
entry_map["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master.port)] = dns_master.entry

for dns_standby in standby_ins_dns_from_standby:
entry_map[
"{}{}{}".format(standby_ins.machine.ip, IP_PORT_DIVIDER_FOR_DNS, standby_ins.port)
] = dns_standby.entry

return entry_map
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from django.utils.crypto import get_random_string
from django.utils.translation import ugettext as _

from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterEntryType, InstanceInnerRole
from backend.constants import IP_PORT_DIVIDER, IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import ClusterEntryRole, ClusterEntryType, InstanceInnerRole
from backend.db_meta.models import Cluster
from backend.db_meta.models.extra_process import ExtraProcessInstance
from backend.flow.consts import ACCOUNT_PREFIX, AUTH_ADDRESS_DIVIDER, InstanceStatus
Expand All @@ -38,6 +38,7 @@
CreateDnsKwargs,
ExecActuatorKwargs,
InstanceUserCloneKwargs,
IpDnsRecordRecycleKwargs,
RecycleDnsRecordKwargs,
)
from backend.flow.utils.mysql.mysql_act_playload import MysqlActPayload
Expand Down Expand Up @@ -103,7 +104,6 @@ def master_and_slave_switch(
if sub_flow:
cluster_switch_sub_pipeline.add_sub_pipeline(sub_flow=sub_flow)

# todo ?授权切换账号
add_sw_user_kwargs = AddSwitchUserKwargs(
bk_cloud_id=cluster.bk_cloud_id,
user=switch_account,
Expand Down Expand Up @@ -192,26 +192,23 @@ def master_and_slave_switch(
"kwargs": asdict(cluster_sw_kwargs),
}
)
cluster_switch_sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 更改旧slave 和 新slave 的域名映射关系,并发执行
acts_list = [
{
"act_name": _("回收旧slave的域名映射"),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
RecycleDnsRecordKwargs(
dns_op_exec_port=cluster_info["mysql_port"],
exec_ip=cluster_info["old_slave_ip"],
bk_cloud_id=cluster_info["bk_cloud_id"],
)
),
}
]
if len(acts_list) > 0:
cluster_switch_sub_pipeline.add_parallel_acts(acts_list=acts_list)
# 从库添加域名
acts_list = []
old_slave = cluster.storageinstance_set.get(machine__ip=cluster_info["old_slave_ip"])
slave_dns_list = old_slave.bind_entry.filter(cluster_entry_type=ClusterEntryType.DNS.value).all()
slave_dns_list = old_slave.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
).all()
cluster_info["slave_dns_list"] = [i.entry for i in slave_dns_list]
# todo 域名映射应该映射老ip对应的所有域名
mater_has_slave_dns_list = []
master_storage = cluster.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
if old_slave.is_stand_by is True:
mater_has_slave_dns_list = master_storage.bind_entry.filter(
cluster_entry_type=ClusterEntryType.DNS.value, role=ClusterEntryRole.SLAVE_ENTRY.value
).all()
cluster_info["slave_dns_list"].extend([i.entry for i in mater_has_slave_dns_list])
cluster_info["slave_dns_list"] = list(set(cluster_info["slave_dns_list"]))
for slave_domain in cluster_info["slave_dns_list"]:
acts_list.append(
{
Expand All @@ -227,6 +224,44 @@ def master_and_slave_switch(
),
}
)
if len(acts_list) > 0:
cluster_switch_sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 移除主 从节点 域名 .如果主库存在从域名,需移除
acts_list = []
for bind_entry in mater_has_slave_dns_list:
acts_list.append(
{
"act_name": _("对主节点移除从域名:{}".format(bind_entry.entry)),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
IpDnsRecordRecycleKwargs(
instance_list=[
"{}{}{}".format(master_storage.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master_storage.port)
],
domain_name=bind_entry.entry,
bk_cloud_id=cluster_info["bk_cloud_id"],
)
),
}
)

# 移除从库本身映射的域名
acts_list.append(
{
"act_name": _("回收旧slave的域名映射"),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
RecycleDnsRecordKwargs(
dns_op_exec_port=cluster_info["mysql_port"],
exec_ip=cluster_info["old_slave_ip"],
bk_cloud_id=cluster_info["bk_cloud_id"],
)
),
}
)
if len(acts_list) > 0:
cluster_switch_sub_pipeline.add_parallel_acts(acts_list=acts_list)

# 增加tbinlogdumper实例部署切换联动
if ExtraProcessInstance.objects.filter(cluster_id=cluster.id).exists():
Expand All @@ -241,9 +276,6 @@ def master_and_slave_switch(
)
),
)

cluster_switch_sub_pipeline.add_parallel_acts(acts_list=acts_list)

return cluster_switch_sub_pipeline.build_sub_process(sub_name=_("{}集群执行成对切换").format(cluster_info["cluster_id"]))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.translation import ugettext as _

from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.constants import IP_PORT_DIVIDER, IP_PORT_DIVIDER_FOR_DNS
from backend.db_meta.enums import InstanceStatus
from backend.db_meta.models import Cluster
from backend.flow.engine.bamboo.scene.common.builder import SubBuilder
Expand All @@ -27,6 +27,7 @@
DownloadMediaKwargs,
ExecuteRdsKwargs,
InstanceUserCloneKwargs,
IpDnsRecordRecycleKwargs,
RecycleDnsRecordKwargs,
)

Expand All @@ -45,7 +46,10 @@ def slave_migrate_switch_sub_flow(
""""""
# 默认预检测连接情况、同步延时、checksum校验结果
master = cluster.main_storage_instances()[0]
old_slave = "{}{}{}".format(old_slave_ip, IP_PORT_DIVIDER, master.port)
old_slave_storage = cluster.storageinstance_set.get(
machine__ip=old_slave_ip, port=master.port, machine__bk_cloud_id=cluster.bk_cloud_id
)
old_slave = "{}{}{}".format(old_slave_ip, IP_PORT_DIVIDER, old_slave_storage.port)
new_slave = "{}{}{}".format(new_slave_ip, IP_PORT_DIVIDER, master.port)
old_master = "{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER, master.port)

Expand Down Expand Up @@ -102,10 +106,7 @@ def slave_migrate_switch_sub_flow(
"bk_cloud_id": cluster.bk_cloud_id,
}
]
slave_storage = cluster.storageinstance_set.filter(
status=InstanceStatus.RUNNING.value, machine__ip=old_slave_ip
).exists()
if slave_storage:
if old_slave_storage.status == InstanceStatus.RUNNING.value:
clone_data.append(
{
"source": old_slave,
Expand Down Expand Up @@ -137,6 +138,23 @@ def slave_migrate_switch_sub_flow(
),
}
)

# 以上已经添加了域名,如果替换的从库是standby,需要删除可能因为切换导致standby域名指向了主库的可能。
if old_slave_storage.is_stand_by is True:
for domain in domain_map["master_has_slave_domain"]:
domain_add_list.append(
{
"act_name": _("删除master上的从域名{}:{} {}").format(master.machine.ip, master.port, domain),
"act_component_code": MySQLDnsManageComponent.code,
"kwargs": asdict(
IpDnsRecordRecycleKwargs(
bk_cloud_id=cluster.bk_cloud_id,
instance_list=["{}{}{}".format(master.machine.ip, IP_PORT_DIVIDER_FOR_DNS, master.port)],
domain_name=domain,
)
),
}
)
if len(domain_add_list) > 0:
sub_pipeline.add_parallel_acts(acts_list=domain_add_list)

Expand Down
Loading

0 comments on commit a385071

Please sign in to comment.