Skip to content

Commit

Permalink
feat(redis): dts支持redis7 TencentBlueKing#8557
Browse files Browse the repository at this point in the history
  • Loading branch information
OMG-By authored and iSecloud committed Feb 14, 2025
1 parent 5d95db9 commit 9c9a6aa
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func (job *RedisDtsDataCheck) GetTools() (err error) {
// 这一部分和提取key保持一致
// 复制dbtools/ldb_tendisplus,ldb_with_len.3.8, ldb_with_len.5.13
// redis-shake redisSafeDeleteTool到get_keys_pattern
cpCmd := fmt.Sprintf("cp %s/ldb* %s/redis-shake %s/redisSafeDeleteTool %s", consts.DbToolsPath,
consts.DbToolsPath, consts.DbToolsPath, job.saveDir)
cpCmd := fmt.Sprintf("cp %s/ldb* %s/redis-shake %s/redis-shake-v4 %s/redisSafeDeleteTool %s", consts.DbToolsPath,
consts.DbToolsPath, consts.DbToolsPath, consts.DbToolsPath, job.saveDir)
job.runtime.Logger.Info(cpCmd)
_, err = util.RunBashCmd(cpCmd, "", nil, 100*time.Second)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ func (job *TendisKeysPattern) UntarMedia() (err error) {

// 复制dbtools/ldb_tendisplus,ldb_with_len.3.8, ldb_with_len.5.13
// redis-shake redisSafeDeleteTool到get_keys_pattern
cpCmd := fmt.Sprintf("cp %s/ldb* %s/redis-shake %s/redisSafeDeleteTool %s", consts.DbToolsPath,
consts.DbToolsPath, consts.DbToolsPath, job.saveDir)
cpCmd := fmt.Sprintf("cp %s/redis-shake-v4 %s/ldb* %s/redis-shake %s/redisSafeDeleteTool %s",
consts.DbToolsPath, consts.DbToolsPath, consts.DbToolsPath, consts.DbToolsPath, job.saveDir)

// // 这里复制所有的,是为了防止工具名变更,也可指定如上一行代码注释
// cpCmd := fmt.Sprintf("cp %s/* %s", consts.DbToolsPath, job.saveDir)
Expand Down Expand Up @@ -486,11 +486,14 @@ func (task *RedisInsKeyPatternTask) GetRedisShakeBin(fetchLatest bool) (bool, er
return false, task.Err
}
shakeTool := "redis-shake"
if task.BigVersion >= 7 {
shakeTool = "redis-shake-v4"
}
task.RedisShakeTool = filepath.Join(task.SaveDir, shakeTool)
// flow 里下发到指定目录 ,检查下发是否成功
_, err := os.Stat(task.RedisShakeTool)
if err != nil && os.IsNotExist(err) {
task.Err = fmt.Errorf("获取redis-shake失败,请检查是否下发成功:err:%v", err)
task.Err = fmt.Errorf("获取%s失败,请检查是否下发成功:err:%v", shakeTool, err)
task.runtime.Logger.Error(task.Err.Error())
return false, task.Err
}
Expand Down Expand Up @@ -857,6 +860,118 @@ func (task *RedisInsKeyPatternTask) tendisSSDAllKeys() {
util.RunLocalCmd("bash", []string{"-c", fmt.Sprintf("chown -R mysql.mysql %s", task.SaveDir)}, "", nil, 10*time.Second)
}

// tendisCacheVersionGe7AllKeys 获取redis7版本以上的所有key
func (task *RedisInsKeyPatternTask) tendisCacheVersionGe7AllKeys() {
task.GetRedisShakeBin(false)
if task.Err != nil {
return
}
value := `[rdb_reader]
filepath = "{{RDB_FULL_PATH}}"
[file_writer]
filepath = "{{RESULT_FULL_PATH}}"
type = "simple"
[filter]
allow_key_regex = ["{{KEY_WHITE_REGEX}}"]
block_key_regex = ["{{KEY_BLACK_REGEX}}"]
allow_db = []
block_db = []
[advanced]
dir = "{{PID_PATH}}"
log_file = "{{LOG_FILE}}"
log_level = "info" # debug, info or warn
log_interval = 5 # in seconds
pipeline_count_limit = 1024
target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes
target_redis_proto_max_bulk_len = 512_000_000
empty_db_before_sync = false
pprof_port = {{SYSTEM_PROFILE}} # pprof port, 0 means disable
status_port = {{HTTP_PROFILE}} # status port, 0 means disable
[module]
target_mbbloom_version = 20603
`

templateFile := filepath.Join(task.SaveDir, fmt.Sprintf("shake.%d.toml", task.Port))
logFile := fmt.Sprintf("shake.%d.log", task.Port)
pidPath := filepath.Join(task.SaveDir, fmt.Sprintf("redis-shake-logs/%d", task.Port))
httpProfile := task.Port + 500
systemProfile := task.Port + 5000
rdbFullPath := fmt.Sprintf("%s/dump.rdb", task.DataDir)

// 这个地方需要注意与V2的区别
whitePattern := task.getSafeRegexPattern(task.KeyWhiteRegex)
if whitePattern == ".*" {
whitePattern = "*"
}
blackPattern := task.getSafeRegexPattern(task.KeyBlackRegex)
if blackPattern == ".*" {
blackPattern = "*"
}
task.setResultFile()
if task.Err != nil {
return
}

// redis-shake-v4需要提前,v4会输出到配置的output file。 而redis-shake提取出来的文件默认带0(这个是工具行为)
task.ResultFile = task.ResultFile + ".0"
value = strings.ReplaceAll(value, "{{RDB_FULL_PATH}}", rdbFullPath)
value = strings.ReplaceAll(value, "{{RESULT_FULL_PATH}}", task.ResultFile)
value = strings.ReplaceAll(value, "{{KEY_WHITE_REGEX}}", whitePattern)
value = strings.ReplaceAll(value, "{{KEY_BLACK_REGEX}}", blackPattern)
value = strings.ReplaceAll(value, "{{PID_PATH}}", pidPath)
value = strings.ReplaceAll(value, "{{LOG_FILE}}", logFile)
value = strings.ReplaceAll(value, "{{HTTP_PROFILE}}", strconv.Itoa(httpProfile))
value = strings.ReplaceAll(value, "{{SYSTEM_PROFILE}}", strconv.Itoa(systemProfile))

err := ioutil.WriteFile(templateFile, []byte(value), 0755)
if err != nil {
task.Err = fmt.Errorf("ioutil.WriteFile fail,file:%s,err:%v", templateFile, err)
return
}
getKeyCmdNew := fmt.Sprintf("%s %s", task.RedisShakeTool, templateFile)
task.runtime.Logger.Info("getKey command:%s", getKeyCmdNew)

var cmdRet string
var msg string
maxRetryTimes := 5
for maxRetryTimes > 0 {
maxRetryTimes--
err = nil
cmdRet, err = util.RunLocalCmd("bash", []string{"-c", getKeyCmdNew}, "", nil, 24*time.Hour)
if err != nil && (strings.Contains(cmdRet, "address already in use") || strings.Contains(err.Error(),
"address already in use")) {
msg = fmt.Sprintf("command:%s port address already in use,retry...", getKeyCmdNew)
task.runtime.Logger.Error(msg)
value = strings.ReplaceAll(value, fmt.Sprintf("pprof_port = %d", httpProfile), fmt.Sprintf("pprof_port = %d",
httpProfile+500))
value = strings.ReplaceAll(value, fmt.Sprintf("status_port = %d", systemProfile),
fmt.Sprintf("status_port = %d", systemProfile+500))
httpProfile += 500
systemProfile += 500
ioutil.WriteFile(templateFile, []byte(value), 0755)
continue
} else if err != nil {
msg = fmt.Sprintf("command:%s err:%v,retry...", getKeyCmdNew, err)
task.runtime.Logger.Error(msg)

time.Sleep(5 * time.Second)
continue
}
break
}
if err != nil {
task.Err = fmt.Errorf("command:%s failed,err:%v,cmdRet:%s", getKeyCmdNew, err, cmdRet)
task.runtime.Logger.Error(task.Err.Error())
return
}
task.runtime.Logger.Info("tendisCacheAllKeys :run success command:%s", getKeyCmdNew)
return
}

// tendisCacheAllKeys 获取所有key
// NOCC:golint/fnsize(设计如此)
func (task *RedisInsKeyPatternTask) tendisCacheAllKeys() {
Expand Down Expand Up @@ -1029,7 +1144,11 @@ func (task *RedisInsKeyPatternTask) GetTendisKeys() {
if task.Err != nil {
return
}
task.tendisCacheAllKeys()
if task.BigVersion >= 7 {
task.tendisCacheVersionGe7AllKeys()
} else {
task.tendisCacheAllKeys()
}
} else if task.TendisType == consts.TendisTypeTendisplusInsance {
task.tendisplusAllKeys()
} else if task.TendisType == consts.TendisTypeTendisSSDInsance {
Expand Down Expand Up @@ -1160,7 +1279,8 @@ type RedisInsTask struct {
Port int `json:"port"`
Password string `json:"password"`
TendisType string `json:"tendis_type"`
Version string `json:"version"` // 区分ssd版本差异,需要使用对应的ldb工具
Version string `json:"version"` // 区分ssd版本差异,需要使用对应的ldb工具
BigVersion int `json:"bing_version"` // 根据版本使用不同同居
Role string `json:"role"`
DataDir string `json:"data_dir"`
DataSize uint64 `json:"data_size"` // 设置并发度
Expand Down Expand Up @@ -1224,6 +1344,13 @@ func (task *RedisInsTask) newConnect() error {
return task.Err
}

task.BigVersion, task.Err = strconv.Atoi(strings.Split(task.Version, ".")[0])
if task.Err != nil {
task.Err = fmt.Errorf("newConnect GetTendisBigVersion Err:%v", task.Err)
task.runtime.Logger.Error(task.Err.Error())
return task.Err
}

// 获取数据量大小
if task.TendisType == consts.TendisTypeRedisInstance {
task.DataSize, task.Err = task.redisCli.RedisInstanceDataSize()
Expand Down
Loading

0 comments on commit 9c9a6aa

Please sign in to comment.