Skip to content

Commit

Permalink
fix(redis): 修复构造实例回写数据的bug #4859
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemakeit authored and zhangzhw8 committed Jun 12, 2024
1 parent f5a8479 commit b08cefb
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 22 deletions.
24 changes: 20 additions & 4 deletions dbm-services/redis/db-tools/dbactuator/pkg/util/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func SplitLargeFile(file, splitTargetSize string, rmOrigin bool) (splitedFiles [
// 参数: tarSaveDir 为 /tmp/
// 返回值: tarFile 为 /tmp/REDIS-FULL-rocksdb-1.1.1.1-30000.tar
func TarADir(originDir, tarSaveDir string, rmOrigin bool) (tarFile string, err error) {
var tarCmd string
var tarCmd, rmCmd string
basename := filepath.Base(originDir)
baseDir := filepath.Dir(originDir)
if tarSaveDir == "" {
Expand All @@ -137,12 +137,28 @@ func TarADir(originDir, tarSaveDir string, rmOrigin bool) (tarFile string, err e
tarFile = filepath.Join(tarSaveDir, basename+".tar")

if rmOrigin {
tarCmd = fmt.Sprintf(`tar --remove-files -cf %s -C %s %s`, tarFile, baseDir, basename)
tarCmd = fmt.Sprintf(`cd %s && tar --remove-files -cf %s %s && rm -rf %s`,
baseDir, filepath.Base(tarFile), basename, basename)
rmCmd = fmt.Sprintf("rm -f %s", tarFile)
} else {
tarCmd = fmt.Sprintf(`tar -cf %s -C %s %s`, tarFile, baseDir, basename)
tarCmd = fmt.Sprintf(`cd %s && tar -cf %s %s`, baseDir, filepath.Base(tarFile), basename)
rmCmd = fmt.Sprintf("rm -f %s", tarFile)
}
mylog.Logger.Info(tarCmd)
_, err = RunBashCmd(tarCmd, "", nil, 6*time.Hour)
maxRetryTimes := 5
for maxRetryTimes >= 0 {
maxRetryTimes--
err = nil
_, err = RunBashCmd(tarCmd, "", nil, 6*time.Hour)
if err != nil {
// 如果报错则删除tar文件然后重试
mylog.Logger.Info(rmCmd)
RunBashCmd(rmCmd, "", nil, 10*time.Minute)
continue
}
// tar命令成功了,则退出
break
}
if err != nil {
return
}
Expand Down
15 changes: 8 additions & 7 deletions dbm-services/redis/db-tools/dbmon/models/myredis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,17 +988,18 @@ func (db *RedisClient) SlaveOf(masterIP, masterPort string) (ret string, err err

// IsClusterEnabled 'cluster-enabled' 是否启动
func (db *RedisClient) IsClusterEnabled() (clusterEnabled bool, err error) {
confData, err := db.ConfigGet("cluster-enabled")
infoRet, err := db.Info("Cluster")
if err != nil {
return
}
val, ok := confData["cluster-enabled"]
if ok && (strings.ToLower(val) == "yes" ||
strings.ToLower(val) == "on" ||
strings.ToLower(val) == "1") {
clusterEnabled = true
ret, ok := infoRet["cluster_enabled"]
if !ok {
return false, nil
}
return
if ret == "1" {
return true, nil
}
return false, nil
}

// ClusterMeet 'cluster meet' command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ func (task *RedisMonitorTask) SetDbmonKeyOnMaster() {
if clusterEnabled {
continue
}
cliItem.SelectDB(1)
task.Err = cliItem.SelectDB(1)
if task.Err != nil {
task.eventSender.SendWarning(consts.EventRedisLogin, task.Err.Error(),
consts.WarnLevelError, task.ServerConf.ServerIP)
return
}

dbmonKey = fmt.Sprintf("dbmon:%s:%d", task.ServerConf.ServerIP, task.ServerConf.ServerPorts[idx])
_, task.Err = cliItem.Set(dbmonKey, timeStr, 0)
Expand Down
24 changes: 20 additions & 4 deletions dbm-services/redis/db-tools/dbmon/util/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func SplitLargeFile(file, splitTargetSize string, rmOrigin bool) (splitedFiles [
// 参数: tarSaveDir 为 /tmp/
// 返回值: tarFile 为 /tmp/REDIS-FULL-rocksdb-1.1.1.1-30000.tar
func TarADir(originDir, tarSaveDir string, rmOrigin bool) (tarFile string, err error) {
var tarCmd string
var tarCmd, rmCmd string
basename := filepath.Base(originDir)
baseDir := filepath.Dir(originDir)
if tarSaveDir == "" {
Expand All @@ -136,12 +136,28 @@ func TarADir(originDir, tarSaveDir string, rmOrigin bool) (tarFile string, err e
tarFile = filepath.Join(tarSaveDir, basename+".tar")

if rmOrigin {
tarCmd = fmt.Sprintf(`tar --remove-files -cf %s -C %s %s`, tarFile, baseDir, basename)
tarCmd = fmt.Sprintf(`cd %s && tar --remove-files -cf %s %s && rm -rf %s`,
baseDir, filepath.Base(tarFile), basename, basename)
rmCmd = fmt.Sprintf("rm -f %s", tarFile)
} else {
tarCmd = fmt.Sprintf(`tar -cf %s -C %s %s`, tarFile, baseDir, basename)
tarCmd = fmt.Sprintf(`cd %s && tar -cf %s %s`, baseDir, filepath.Base(tarFile), basename)
rmCmd = fmt.Sprintf("rm -f %s", tarFile)
}
mylog.Logger.Info(tarCmd)
_, err = RunBashCmd(tarCmd, "", nil, 6*time.Hour)
maxRetryTimes := 5
for maxRetryTimes >= 0 {
maxRetryTimes--
err = nil
_, err = RunBashCmd(tarCmd, "", nil, 6*time.Hour)
if err != nil {
// 如果报错则删除tar文件然后重试
mylog.Logger.Info(rmCmd)
RunBashCmd(rmCmd, "", nil, 10*time.Minute)
continue
}
// tar命令成功了,则退出
break
}
if err != nil {
return
}
Expand Down
3 changes: 3 additions & 0 deletions dbm-ui/backend/flow/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
DEFAULT_TWEMPROXY_ADMIN_PORT_EXTRA = 1000
# 默认Redis dbnum
DEFAULT_REDIS_DBNUM = 0
# 默认Redis databases
DEFAULT_REDIS_INSTANCE_DATABASES = 2
DEFAULT_REDIS_CLUSTER_DATABASES = 1

# 切换时, 默认允许多久心跳
DEFAULT_MASTER_DIFF_TIME = 61
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@
is_tendisssd_instance_type,
is_twemproxy_proxy_type,
)
from backend.flow.consts import ConfigFileEnum, StateType, WriteContextOpType
from backend.flow.consts import (
DEFAULT_REDIS_CLUSTER_DATABASES,
DEFAULT_REDIS_INSTANCE_DATABASES,
ConfigFileEnum,
StateType,
WriteContextOpType,
)
from backend.flow.engine.bamboo.scene.common.atom_jobs.set_dns_sub_job import set_dns_atom_job
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
Expand Down Expand Up @@ -136,7 +142,9 @@ def redis_cluster_data_copy_flow(self):

# 获取云区域的dns nameserver
sub_pipeline.add_act(
act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs)
act_name=_("初始化配置"),
act_component_code=GetRedisActPayloadComponent.code,
kwargs=asdict(act_kwargs),
)

if (
Expand Down Expand Up @@ -338,7 +346,12 @@ def get_dst_cluster_install_param(self, info: dict) -> dict:
install_param["cluster_password"] = src_cluster_info["cluster_password"]
install_param["redis_password"] = src_cluster_info["redis_password"]
install_param["redis_proxy_admin_password"] = src_cluster_info.get("redis_proxy_admin_password", "")
install_param["redis_databases"] = src_cluster_info["redis_databases"]
if is_twemproxy_proxy_type(info.get("target_cluster_type", src_cluster_info["cluster_type"])):
install_param["redis_databases"] = DEFAULT_REDIS_INSTANCE_DATABASES
elif is_redis_cluster_protocal(info.get("target_cluster_type", src_cluster_info["cluster_type"])):
install_param["redis_databases"] = DEFAULT_REDIS_CLUSTER_DATABASES
else:
install_param["redis_databases"] = src_cluster_info["redis_databases"]
install_param["max_disk"] = info["max_disk"]
install_param["maxmemory"] = info["maxmemory"]
install_param["region"] = src_cluster_info["region"]
Expand Down Expand Up @@ -503,7 +516,9 @@ def shard_num_or_cluster_type_update_flow(self):
)

redis_pipeline.add_act(
act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs)
act_name=_("初始化配置"),
act_component_code=GetRedisActPayloadComponent.code,
kwargs=asdict(act_kwargs),
)

dst_cluster_installed = self.is_dst_cluster_installed(info, dst_install_param)
Expand Down Expand Up @@ -722,7 +737,9 @@ def online_switch_flow(self):
act_kwargs.cluster = {}

redis_pipeline.add_act(
act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs)
act_name=_("初始化配置"),
act_component_code=GetRedisActPayloadComponent.code,
kwargs=asdict(act_kwargs),
)

acts_list = []
Expand Down Expand Up @@ -767,7 +784,9 @@ def online_switch_flow(self):

# 再次初始化 redis_act_payload,以便actuator中能获取前面 前置检查结果
redis_pipeline.add_act(
act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs)
act_name=_("初始化配置"),
act_component_code=GetRedisActPayloadComponent.code,
kwargs=asdict(act_kwargs),
)

# 下发 proxy 安装介质,actuator介质
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ def check_src_cluster_nodes_ok(self, dts_copy_type: str, src_data: dict) -> List
raise Exception(
"src_cluster:{} not found masters(with_slots),master:{}".format(src_data["cluster_addr"], master_addr)
)
if dts_copy_type == DtsCopyType.COPY_FROM_ROLLBACK_INSTANCE.value:
self.log_info(
"src_cluster:{} dst_copy_type:{} skip checking if master has a slave".format(
src_data["cluster_addr"], dts_copy_type
)
)
return
slaves_by_masterid = group_slaves_by_master_id(cluster_nodes_str)
meta_slaves = {}
for src_slave in src_data["slave_instances"]:
Expand Down

0 comments on commit b08cefb

Please sign in to comment.