diff --git a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst/const.go b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst/const.go index b57f21d68b..c9c86a7b90 100644 --- a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst/const.go +++ b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst/const.go @@ -16,6 +16,10 @@ const ( BackupNone = "none" ) +const ( + StorageEnginRocksdb = "rocksdb" +) + // backup role: dbm-services/mysql/db-tools/dbactuator/pkg/core/cst/mysql.go const ( // RoleMaster lower case @@ -112,8 +116,9 @@ const MysqlCrondUrl = "http://127.0.0.1:9999" const MysqlRotateBinlogInstallPath = "/home/mysql/mysql-rotatebinlog" const ( - ToolMydumper = "mydumper" - ToolMysqldump = "mysqldump" - ToolXtrabackup = "xtrabackup" - ToolTmysqldump = "tmysqldump" + ToolMydumper = "mydumper" + ToolMysqldump = "mysqldump" + ToolXtrabackup = "xtrabackup" + ToolTmysqldump = "tmysqldump" + ToolMyrocksHotbackup = "myrocks_hotbackup" ) diff --git a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper.go b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper.go index b5250ae4f2..b50bb2911b 100644 --- a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper.go +++ b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper.go @@ -24,7 +24,7 @@ type Dumper interface { } // BuildDumper return logical or physical dumper -func BuildDumper(cnf *config.BackupConfig) (dumper Dumper, err error) { +func BuildDumper(cnf *config.BackupConfig, storageEngine string) (dumper Dumper, err error) { if err = precheck.CheckBackupType(cnf); err != nil { return nil, err } @@ -66,13 +66,21 @@ func BuildDumper(cnf *config.BackupConfig) (dumper Dumper, err error) { if err := validate.GoValidateStruct(cnf.PhysicalBackup, false, false); err != nil { return nil, err } - dumper = &PhysicalDumper{ - cnf: cnf, + + if cst.StorageEnginRocksdb == storageEngine { + dumper = &PhysicalRocksdbDumper{ + cfg: cnf, + } + } else { + dumper = &PhysicalDumper{ + cnf: cnf, + } } } else { logger.Log.Error(fmt.Sprintf("Unknown BackupType: %s", cnf.Public.BackupType)) err := fmt.Errorf("unknown BackupType: %s", cnf.Public.BackupType) return nil, err } + return dumper, nil } diff --git a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical.go b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical.go index e1c05a36bb..60b42c6ce7 100644 --- a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical.go +++ b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical.go @@ -162,7 +162,7 @@ func (p *PhysicalDumper) Execute(enableTimeOut bool) error { _ = db.Close() }() if originVal, err := mysqlconn.SetGlobalVarAndReturnOrigin("slave_parallel_workers", "0", db); err != nil { - logger.Log.Error("set global slave_parallel_workers=0 failed, err: %s", err.Error()) + logger.Log.Errorf("set global slave_parallel_workers=0 failed, err: %s", err.Error()) return err } else { logger.Log.Infof("will set global slave_parallel_workers=%s after backup finished", originVal) diff --git a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical_rocksdb.go b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical_rocksdb.go new file mode 100644 index 0000000000..4274268ea3 --- /dev/null +++ b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/dumper_physical_rocksdb.go @@ -0,0 +1,231 @@ +package backupexe + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/pkg/errors" + + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/config" + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/cst" + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/dbareport" + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/logger" + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/mysqlconn" + "dbm-services/mysql/db-tools/mysql-dbbackup/pkg/util" +) + +type PhysicalRocksdbDumper struct { + cfg *config.BackupConfig + dbbackupHome string + checkpointDir string + mysqlVersion string + isOfficial bool + rocksdbCmd string + storageEngine string + backupStartTime time.Time + backupEndTime time.Time +} + +func (p *PhysicalRocksdbDumper) buildArgs() []string { + + targetPath := filepath.Join(p.cfg.Public.BackupDir, p.cfg.Public.TargetName()) + + args := []string{ + fmt.Sprintf("--host=%s", p.cfg.Public.MysqlHost), + fmt.Sprintf("--port=%d", p.cfg.Public.MysqlPort), + fmt.Sprintf("--user=%s", p.cfg.Public.MysqlUser), + fmt.Sprintf("--password=%s", p.cfg.Public.MysqlPasswd), + fmt.Sprintf("--checkpoint_dir=%s", p.checkpointDir), + fmt.Sprintf("--backup_dir=%s", targetPath), + "--stream=disabled", + } + + if strings.ToLower(p.cfg.Public.MysqlRole) == cst.RoleSlave { + args = append(args, "--slave_info") + } + + if strings.ToLower(p.cfg.Public.MysqlRole) == cst.RoleMaster { + args = append(args, "--master_info") + } + + return args +} + +func (p *PhysicalRocksdbDumper) initConfig(mysqlVersion string) error { + if p.cfg == nil { + return errors.New("rocksdb physical dumper config missed") + } + + cmdPath, err := os.Executable() + + if err != nil { + return err + } + + p.dbbackupHome = filepath.Dir(cmdPath) + db, err := mysqlconn.InitConn(&p.cfg.Public) + + if err != nil { + return err + } + + p.mysqlVersion, p.isOfficial = util.VersionParser(mysqlVersion) + p.storageEngine, err = mysqlconn.GetStorageEngine(db) + + if err != nil { + return err + } + p.storageEngine = strings.ToLower(p.storageEngine) + + defer func() { + _ = db.Close() + }() + + p.checkpointDir = fmt.Sprintf("%s/MyRocks_checkpoint", p.cfg.Public.BackupDir) + p.rocksdbCmd = "/bin/" + cst.ToolMyrocksHotbackup + BackupTool = cst.ToolMyrocksHotbackup + return nil +} + +func (p *PhysicalRocksdbDumper) Execute(enableTimeOut bool) error { + p.backupStartTime = time.Now() + defer func() { + p.backupEndTime = time.Now() + }() + + if p.storageEngine != cst.StorageEnginRocksdb { + err := fmt.Errorf("%s engine not support", p.storageEngine) + logger.Log.Error(err) + return err + } + + _, err := os.Stat(p.checkpointDir) + if os.IsNotExist(err) { + err = os.MkdirAll(p.checkpointDir, 0755) + } + + if err != nil { + logger.Log.Errorf("failed to create checkpoint(%s), err-msg:%s", p.checkpointDir, err) + return err + } + + binPath := filepath.Join(p.dbbackupHome, p.rocksdbCmd) + args := p.buildArgs() + + var cmd *exec.Cmd + backupCmd := fmt.Sprintf(`%s %s`, binPath, strings.Join(args, " ")) + + if enableTimeOut { + timeDiffUinx, err := GetMaxRunningTime(p.cfg.Public.BackupTimeOut) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), (time.Duration(timeDiffUinx))*time.Second) + defer cancel() + + cmd = exec.CommandContext(ctx, "sh", "-c", backupCmd) + } else { + cmd = exec.Command("sh", "-c", backupCmd) + } + + backuplogFilename := fmt.Sprintf("%s_backup_%d_%d.log", p.storageEngine, p.cfg.Public.MysqlPort, int(time.Now().Weekday())) + rocksdbBackuplogFilename := filepath.Join(p.dbbackupHome, "logs", backuplogFilename) + + outFile, err := os.Create(rocksdbBackuplogFilename) + + if err != nil { + logger.Log.Error("create log file failed: ", err) + return err + } + + defer func() { + _ = outFile.Close() + }() + + cmd.Stdout = outFile + cmd.Stderr = outFile + logger.Log.Info("rocksdb backup command: ", cmd.String()) + + err = cmd.Run() + if err != nil { + logger.Log.Error("run rocksdb physical backup failed: ", err) + return err + } + + return nil +} + +func (p *PhysicalRocksdbDumper) PrepareBackupMetaInfo(cnf *config.BackupConfig) (*dbareport.IndexContent, error) { + db, err := mysqlconn.InitConn(&cnf.Public) + if err != nil { + return nil, errors.WithMessage(err, "IndexContent") + } + + defer func() { + _ = db.Close() + }() + + storageEngine, err := mysqlconn.GetStorageEngine(db) + if err != nil { + return nil, err + } + + storageEngine = strings.ToLower(storageEngine) + + if storageEngine != "rocksdb" { + logger.Log.Errorf("unknown storage engine(%s)", storageEngine) + return nil, nil + } + + xtrabackupBinlogInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_binlog_info") + xtrabackupSlaveInfoFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "xtrabackup_slave_info") + + tmpFileName := filepath.Join(cnf.Public.BackupDir, cnf.Public.TargetName(), "tmp_dbbackup_go.txt") + + exepath, err := os.Executable() + if err != nil { + return nil, err + } + + exepath = filepath.Dir(exepath) + qpressPath := filepath.Join(exepath, "bin", "qpress") + + var metaInfo = dbareport.IndexContent{ + BinlogInfo: dbareport.BinlogStatusInfo{}, + } + + if masterStatus, err := parseXtraBinlogInfo(qpressPath, xtrabackupBinlogInfoFileName, tmpFileName); err != nil { + return nil, err + } else { + metaInfo.BinlogInfo.ShowMasterStatus = masterStatus + metaInfo.BinlogInfo.ShowMasterStatus.MasterHost = cnf.Public.MysqlHost + metaInfo.BinlogInfo.ShowMasterStatus.MasterPort = cnf.Public.MysqlPort + } + + if mysqlRole := strings.ToLower(cnf.Public.MysqlRole); mysqlRole == cst.RoleSlave || mysqlRole == cst.RoleRepeater { + if slaveStatus, err := parseXtraSlaveInfo(qpressPath, xtrabackupSlaveInfoFileName, tmpFileName); err != nil { + return nil, err + } else { + metaInfo.BinlogInfo.ShowSlaveStatus = slaveStatus + masterHost, masterPort, err := mysqlconn.ShowMysqlSlaveStatus(db) + if err != nil { + return nil, err + } + metaInfo.BinlogInfo.ShowSlaveStatus.MasterHost = masterHost + metaInfo.BinlogInfo.ShowSlaveStatus.MasterPort = masterPort + } + } + + metaInfo.JudgeIsFullBackup(&cnf.Public) + if err = os.Remove(tmpFileName); err != nil { + return &metaInfo, err + } + + return &metaInfo, nil +} diff --git a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/execute_dump.go b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/execute_dump.go index c836400dcb..6c89015a83 100644 --- a/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/execute_dump.go +++ b/dbm-services/mysql/db-tools/mysql-dbbackup/pkg/src/backupexe/execute_dump.go @@ -33,10 +33,17 @@ func ExecuteBackup(cnf *config.BackupConfig) (*dbareport.IndexContent, error) { if err != nil { return nil, err } + + storageEngine, err := mysqlconn.GetStorageEngine(db) + if err != nil { + return nil, err + } + storageEngine = strings.ToLower(storageEngine) + mysqlVersion, isOfficial := util.VersionParser(versionStr) XbcryptBin = GetXbcryptBin(mysqlVersion, isOfficial) - dumper, err := BuildDumper(cnf) + dumper, err := BuildDumper(cnf, storageEngine) if err != nil { return nil, err }