Skip to content

Commit

Permalink
fix(redis): redis-dts修复获取port重复的bug #4666
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemakeit authored and zhangzhw8 committed Jun 3, 2024
1 parent e42dbdf commit c066db7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 30 deletions.
4 changes: 2 additions & 2 deletions dbm-services/redis/redis-dts/models/mysql/tendisdb/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func (t *TbTendisDTSTask) ToString() string {

// TaskLockKey keyname
func (t *TbTendisDTSTask) TaskLockKey() string {
return fmt.Sprintf("TendisDTS_task_lock_%d_%s_%s_%s_%d",
return fmt.Sprintf("TendisDTS_task_lock_%d_%s_%s_%s_%d_%d",
t.BillID, t.SrcCluster, t.DstCluster,
t.SrcIP, t.SrcPort)
t.SrcIP, t.SrcPort, t.SrcKvStoreID)
}

// IsAllDtsTasksToForceKill 是否全部tasks都等待被force kill
Expand Down
19 changes: 11 additions & 8 deletions dbm-services/redis/redis-dts/pkg/dtsJob/tendisplusDtsJob.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (job *TendisplusDtsJob) TryAcceptTasks(taskRows []*tendisdb.TbTendisDTSTask
job.logger.Info(fmt.Sprintf("myself:%s get task dts lock ok,key:%s", job.ServerIP, taskRows[0].TaskLockKey()))
// 尝试认领task成功
job.logger.Info(fmt.Sprintf(
`myself:%s 认领task,下一步开始迁移,billID:%d srcCluster:%s dstCluster:%s srcRedis:%s#%d`,
`myself:%s 认领task,下一步开始迁移,billID:%d srcCluster:%s dstCluster:%s srcRedis:%s#%d kvStoreID:%d`,
job.ServerIP, taskRows[0].BillID, taskRows[0].SrcCluster,
taskRows[0].DstCluster, taskRows[0].SrcIP, taskRows[0].SrcPort))
taskRows[0].DstCluster, taskRows[0].SrcIP, taskRows[0].SrcPort, taskRows[0].SrcKvStoreID))
taskIDs := make([]int64, 0, len(taskRows))
for _, tmpTask := range taskRows {
task := tmpTask
Expand Down Expand Up @@ -165,7 +165,7 @@ func (job *TendisplusDtsJob) ClaimDtsJobs() (err error) {
var memOK bool
var availMemSizeMigration int64
var toScheduleTasks []*tendisdb.TbTendisDTSTask
var slaveAddrToTasks map[string][]*tendisdb.TbTendisDTSTask
// var slaveAddrToTasks map[string][]*tendisdb.TbTendisDTSTask
var srcSlaveConcurrOK bool
var acceptOk bool
// 在迁移tendisplus时,其本身数据量不影响,所以用MAX_INT64值
Expand Down Expand Up @@ -217,11 +217,14 @@ func (job *TendisplusDtsJob) ClaimDtsJobs() (err error) {
if len(toScheduleTasks) == 0 {
continue
}
slaveAddrToTasks, err = job.TasksGroupBySlaveAddr(toScheduleTasks)
if err != nil {
continue
}
for _, taskRows := range slaveAddrToTasks {
// slaveAddrToTasks, err = job.TasksGroupBySlaveAddr(toScheduleTasks)
// if err != nil {
// continue
// }
// for _, taskRows := range slaveAddrToTasks {
for _, tmpTask := range toScheduleTasks {
taskItem := tmpTask
taskRows := []*tendisdb.TbTendisDTSTask{taskItem}
if availMemSizeMigration < 1*constvar.GiByte {
// 如果可用内存小于1GB,则不再继续
break
Expand Down
18 changes: 10 additions & 8 deletions dbm-services/redis/redis-dts/pkg/dtsTask/tendisplus/makeSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"dbm-services/redis/redis-dts/models/mysql/tendisdb"
"dbm-services/redis/redis-dts/pkg/constvar"
"dbm-services/redis/redis-dts/pkg/dtsTask"
"dbm-services/redis/redis-dts/tclog"
"dbm-services/redis/redis-dts/util"

"github.com/jinzhu/gorm"
Expand Down Expand Up @@ -283,7 +282,9 @@ func (task *MakeSyncTask) TendisplusMasterSlaveConfigSet() {
func (task *MakeSyncTask) getMySyncPort(initSyncPort int) {
taskTypes := []string{}
var syncerPort int
taskTypes = append(taskTypes, constvar.MakeSyncTaskType)
taskTypes = append(taskTypes, constvar.TendisplusMakeSyncTaskType)
taskTypes = append(taskTypes, constvar.TendisplusSendBulkTaskType)
taskTypes = append(taskTypes, constvar.TendisplusSendIncrTaskType)
if initSyncPort <= 0 {
initSyncPort = 40000
dtsSvrMaxSyncPortTask, err := tendisdb.GetDtsSvrMaxSyncPort(task.RowData.BkCloudID, task.RowData.DtsServer,
Expand Down Expand Up @@ -508,7 +509,7 @@ func (task *MakeSyncTask) RedisSyncInfo(section string) (infoRets map[string]str
func (task *MakeSyncTask) IsSyncAlive() (isAlive bool, err error) {
isSyncAliaveCmd := fmt.Sprintf("ps -ef|grep 'taskid%d-'|grep 'kvstore-%d-'|grep -v grep|grep sync|grep conf || true",
task.RowData.ID, task.RowData.SrcKvStoreID)
tclog.Logger.Info("", zap.String("isSyncAliaveCmd", isSyncAliaveCmd))
task.Logger.Info("", zap.String("isSyncAliaveCmd", isSyncAliaveCmd))
ret, err := util.RunLocalCmd("bash", []string{"-c", isSyncAliaveCmd}, "", nil, 1*time.Minute, task.Logger)
if err != nil {
return false, err
Expand Down Expand Up @@ -578,11 +579,11 @@ func (task *MakeSyncTask) IsSyncStateOK() (ok bool) {
func (task *MakeSyncTask) RedisSyncStop() {
isAlive, err := task.IsSyncAlive()
if !isAlive {
tclog.Logger.Info(fmt.Sprintf("RedisSyncStop srcRedis:%s kvStore:%d sync is not alive",
task.Logger.Info(fmt.Sprintf("RedisSyncStop srcRedis:%s kvStore:%d sync is not alive",
task.GetSrcRedisAddr(), task.RowData.SrcKvStoreID))
return
}
tclog.Logger.Info(fmt.Sprintf("RedisSyncStop srcRedis:%s kvStore:%d sync is alive",
task.Logger.Info(fmt.Sprintf("RedisSyncStop srcRedis:%s kvStore:%d sync is alive",
task.GetSrcRedisAddr(), task.RowData.SrcKvStoreID))

opts := []string{"SYNCADMIN", "stop"}
Expand Down Expand Up @@ -629,9 +630,9 @@ func (task *MakeSyncTask) RedisSyncStop() {

// RedisSyncStart 启动redis-sync
func (task *MakeSyncTask) RedisSyncStart(reacquirePort bool) {
tclog.Logger.Info(fmt.Sprintf("redis-sync start srcRedisAddr:%s kvStoreId:%d dstCluster:%s ...",
task.Logger.Info(fmt.Sprintf("redis-sync start srcRedisAddr:%s kvStoreId:%d dstCluster:%s ...",
task.GetSrcRedisAddr(), task.RowData.SrcKvStoreID, task.GetDstRedisAddr()))
defer tclog.Logger.Info("end redis-sync start")
defer task.Logger.Info("end redis-sync start")

if reacquirePort {
task.getMySyncPort(0)
Expand Down Expand Up @@ -699,7 +700,7 @@ func (task *MakeSyncTask) RedisSyncStart(reacquirePort bool) {
if task.Err != nil {
return
}
tclog.Logger.Info("redis-sync 'syncadmin start' success", zap.String("cmdRet", ret02))
task.Logger.Info("redis-sync 'syncadmin start' success", zap.String("cmdRet", ret02))

task.UpdateDbAndLogLocal("redis-sync %d start success", task.RowData.SyncerPort)
return
Expand Down Expand Up @@ -838,6 +839,7 @@ func (task *MakeSyncTask) WatchSync() {
task.SetTaskType(constvar.TendisplusSendBulkTaskType)
task.UpdateDbAndLogLocal("全量迁移中,binlog_pos:%d,lag:%d", myRockSlave.BinlogPos, myRockSlave.Lag)
} else {
task.SetTendisBinlogLag(myRockSlave.Lag)
task.SetTaskType(constvar.TendisplusSendIncrTaskType)
task.UpdateDbAndLogLocal("增量同步中,binlog_pos:%d,lag:%d", myRockSlave.BinlogPos, myRockSlave.Lag)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def redis_cluster_data_check_repair_flow(self):
act_kwargs.cluster = self.__get_dts_job_data(info)

sub_pipeline.add_act(
act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs)
act_name=_("初始化配置"),
act_component_code=GetRedisActPayloadComponent.code,
kwargs=asdict(act_kwargs),
)

exec_ips = set()
Expand Down Expand Up @@ -167,34 +169,40 @@ def __get_dts_job_data(self, info: dict) -> dict:

ret["dts_copy_type"] = job_row.dts_copy_type
src_ips_set = set()
src_ip_port_set = set()
if len(info["src_instances"]) == 1 and info["src_instances"][0].upper() == "ALL":
for row in TbTendisDtsTask.objects.filter(where).all():
if first_task is None:
first_task = row
src_ips_set.add(row.src_ip)
if row.src_ip in ret:
ret[row.src_ip].append(
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
)
else:
ip_port = f"{row.src_ip}:{row.src_port}"
if row.src_ip not in ret:
ret[row.src_ip] = [
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
]
elif ip_port not in src_ip_port_set:
ret[row.src_ip].append(
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
)
src_ip_port_set.add(ip_port)
else:
for src_inst in info["src_instances"]:
src_ip, src_port = src_inst.split(":")
for row in TbTendisDtsTask.objects.filter(where).filter(src_ip=src_ip, src_port=int(src_port)).all():
if first_task is None:
first_task = row
src_ips_set.add(row.src_ip)
if row.src_ip in ret:
ret[row.src_ip].append(
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
)
else:
ip_port = f"{row.src_ip}:{row.src_port}"
if row.src_ip not in ret:
ret[row.src_ip] = [
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
]
elif ip_port not in src_ip_port_set:
ret[row.src_ip].append(
{"port": row.src_port, "segment_start": row.src_seg_start, "segment_end": row.src_seg_end}
)
src_ip_port_set.add(ip_port)

if first_task is None:
logger.error(
"get dts task not found,bill_id:{} src_cluster:{} dst_cluster:{} src_instances:{}".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ def get_src_redis_host_concurrency(self, trans_data: RedisDtsContext, kwargs: Ac
if kwargs["cluster"]["dts_copy_type"] == DtsCopyType.USER_BUILT_TO_DBM:
return 5
if max_datasize_instance["db_type"] == ClusterType.TendisTendisplusInsance:
return 10
return 5

if max_datasize_instance["data_size"] == 0:
return 5
Expand Down

0 comments on commit c066db7

Please sign in to comment.