From 91ccfa03a89e3fa04c948c87d08e3c59c1341c9f Mon Sep 17 00:00:00 2001 From: Xu-Wentao Date: Wed, 29 Mar 2023 18:45:13 +0800 Subject: [PATCH] bugfix(pitr): fix storage nodes info struct error, add endtime to backup file, and fix backup mode ptrack did not work. (#288) --- pitr/agent/internal/handler/backup.go | 8 +-- pitr/cli/go.mod | 2 + pitr/cli/go.sum | 5 ++ pitr/cli/internal/cmd/backup.go | 69 ++++++++++++++----- pitr/cli/internal/cmd/cmd_suite_test.go | 1 + pitr/cli/internal/cmd/restore.go | 5 ++ pitr/cli/internal/cmd/restore_test.go | 43 ++++++++++-- pitr/cli/internal/cmd/root.go | 5 +- pitr/cli/internal/pkg/local-storage.go | 2 +- pitr/cli/internal/pkg/model/ls_backup.go | 15 ++-- pitr/cli/internal/pkg/shardingsphere-proxy.go | 27 ++++++-- pitr/cli/pkg/gsutil/conn_test.go | 29 ++++++-- 12 files changed, 161 insertions(+), 50 deletions(-) diff --git a/pitr/agent/internal/handler/backup.go b/pitr/agent/internal/handler/backup.go index b69e16da..c58c8237 100644 --- a/pitr/agent/internal/handler/backup.go +++ b/pitr/agent/internal/handler/backup.go @@ -20,14 +20,12 @@ package handler import ( "errors" "fmt" - "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg" - "github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder" + "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons" "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view" - + "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg" + "github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder" "github.com/gofiber/fiber/v2" - - "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons" ) func Backup(ctx *fiber.Ctx) error { diff --git a/pitr/cli/go.mod b/pitr/cli/go.mod index 191b4607..14b51d00 100644 --- a/pitr/cli/go.mod +++ b/pitr/cli/go.mod @@ -5,8 +5,10 @@ go 1.20 require ( bou.ke/monkey v1.0.2 gitee.com/opengauss/openGauss-connector-go-pq v1.0.4 + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 + github.com/jarcoal/httpmock v1.3.0 github.com/onsi/ginkgo/v2 v2.9.1 github.com/onsi/gomega v1.27.3 github.com/spf13/cobra v1.6.1 diff --git a/pitr/cli/go.sum b/pitr/cli/go.sum index e3198424..3855af18 100644 --- a/pitr/cli/go.sum +++ b/pitr/cli/go.sum @@ -4,6 +4,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT gitee.com/opengauss/openGauss-connector-go-pq v1.0.4 h1:npfLM9/QpkmdK+XY9X2pcC2EX5gosyn/6dRDRd2sEJs= gitee.com/opengauss/openGauss-connector-go-pq v1.0.4/go.mod h1:2UEp+ug6ls6C0pLfZgBn7VBzBntFUzxJuy+6FlQ7qyI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -52,6 +54,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= +github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= @@ -60,6 +64,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g= github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk= github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo= github.com/onsi/gomega v1.27.3 h1:5VwIwnBY3vbBDOJrNtA4rVdiTZCsq9B5F12pvy1Drmk= diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go index 8e7449b0..8d3b3a9a 100644 --- a/pitr/cli/internal/cmd/backup.go +++ b/pitr/cli/internal/cmd/backup.go @@ -49,6 +49,17 @@ var BackupCmd = &cobra.Command{ fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value) }) + // convert BackupModeStr to BackupMode + switch BackupModeStr { + case "FULL", "full": + BackupMode = model.BDBackModeFull + case "PTRACK", "ptrack": + BackupMode = model.DBBackModePTrack + } + if BackupMode == model.DBBackModePTrack { + logging.Warn("Please make sure all openGauss nodes have been set correct configuration about ptrack. You can refer to https://support.huaweicloud.com/intl/zh-cn/devg-opengauss/opengauss_devg_1362.html for more details.") + } + logging.Info(fmt.Sprintf("Default backup path: %s", pkg.DefaultRootDir())) // Start backup @@ -71,7 +82,7 @@ func init() { _ = BackupCmd.MarkFlagRequired("password") BackupCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path") _ = BackupCmd.MarkFlagRequired("dn-backup-path") - BackupCmd.Flags().StringVarP(&BackupMode, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)") + BackupCmd.Flags().StringVarP(&BackupModeStr, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)") _ = BackupCmd.MarkFlagRequired("dn-backup-mode") BackupCmd.Flags().Uint8VarP(&ThreadsNum, "dn-threads-num", "j", 1, "openGauss data backup threads nums") BackupCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port") @@ -88,6 +99,7 @@ func init() { // 6. Update local backup info // 7. Double check backups all finished func backup() error { + var err error proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDbName, Host, Port) if err != nil { return xerr.NewCliErr("create ss-proxy connect failed") @@ -98,48 +110,65 @@ func backup() error { return xerr.NewCliErr("create local storage failed") } + defer func() { + if err != nil { + logging.Info("try to unlock cluster ...") + if err := proxy.Unlock(); err != nil { + logging.Error(fmt.Sprintf("coz backup failed, try to unlock cluster, but still failed, err:%s", err.Error())) + } + } + }() + // Step1. lock cluster - if err := proxy.LockForBackup(); err != nil { + logging.Info("Starting lock cluster ...") + err = proxy.LockForBackup() + if err != nil { return xerr.NewCliErr("lock for backup failed") } // Step2. Get cluster info and save local backup info + logging.Info("Starting export metadata ...") lsBackup, err := exportData(proxy, ls) if err != nil { return xerr.NewCliErr(fmt.Sprintf("export backup data failed, err:%s", err.Error())) } - logging.Info(fmt.Sprintf("export backup data success, backup filename: %s", filename)) + logging.Info(fmt.Sprintf("Export backup data success, backup filename: %s", filename)) // Step3. send backup command to agent-server. - if err := execBackup(lsBackup); err != nil { - // if backup failed, still need to unlock cluster. - if err := proxy.Unlock(); err != nil { - logging.Error(fmt.Sprintf("coz exec backup failed, try to unlock cluster, but still failed, err:%s", err.Error())) - } + logging.Info("Starting backup ...") + err = execBackup(lsBackup) + if err != nil { return xerr.NewCliErr(fmt.Sprintf("exec backup failed, err:%s", err.Error())) } // Step4. unlock cluster - if err := proxy.Unlock(); err != nil { + logging.Info("Starting unlock cluster ...") + err = proxy.Unlock() + if err != nil { return xerr.NewCliErr(fmt.Sprintf("unlock cluster failed, err:%s", err.Error())) } // Step5. update backup file - if err := ls.WriteByJSON(filename, lsBackup); err != nil { + logging.Info("Starting update backup file ...") + err = ls.WriteByJSON(filename, lsBackup) + if err != nil { return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err:%s", err.Error())) } - // Step6. check agent server backup status + // Step6. check agent server backup + logging.Info("Starting check backup status ...") status := checkBackupStatus(lsBackup) - logging.Info(fmt.Sprintf("backup result:%s", status)) + logging.Info(fmt.Sprintf("Backup result: %s", status)) // Step7. finished backup and update backup file - if err := ls.WriteByJSON(filename, lsBackup); err != nil { + logging.Info("Starting update backup file ...") + err = ls.WriteByJSON(filename, lsBackup) + if err != nil { return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err: %s", err.Error())) } - logging.Info("backup finished") + logging.Info("Backup finished!") return nil } @@ -166,10 +195,11 @@ func exportData(proxy pkg.IShardingSphereProxy, ls pkg.ILocalStorage) (lsBackup contents := &model.LsBackup{ Info: &model.BackupMetaInfo{ - ID: uuid.New().String(), // generate uuid for this backup - CSN: csn, - StartTime: time.Now().Unix(), - EndTime: 0, + ID: uuid.New().String(), // generate uuid for this backup + CSN: csn, + StartTime: time.Now().Unix(), + EndTime: 0, + BackupMode: BackupMode, }, SsBackup: &model.SsBackup{ Status: model.SsBackupStatusWaiting, // default status of backup is model.SsBackupStatusWaiting @@ -231,7 +261,7 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model. Password: node.Password, DnBackupPath: BackupPath, DnThreadsNum: ThreadsNum, - DnBackupMode: model.BDBackModeFull, + DnBackupMode: BackupMode, Instance: defaultInstance, } backupID, err := as.Backup(in) @@ -297,6 +327,7 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus { } lsBackup.SsBackup.Status = backupFinalStatus + lsBackup.Info.EndTime = time.Now().Unix() return backupFinalStatus } diff --git a/pitr/cli/internal/cmd/cmd_suite_test.go b/pitr/cli/internal/cmd/cmd_suite_test.go index 103c622a..1551eec4 100644 --- a/pitr/cli/internal/cmd/cmd_suite_test.go +++ b/pitr/cli/internal/cmd/cmd_suite_test.go @@ -48,6 +48,7 @@ func initLog() { } logging.Init(logger) } + func TestCmd(t *testing.T) { initLog() RegisterFailHandler(Fail) diff --git a/pitr/cli/internal/cmd/restore.go b/pitr/cli/internal/cmd/restore.go index cfa0082d..13ed7d1b 100644 --- a/pitr/cli/internal/cmd/restore.go +++ b/pitr/cli/internal/cmd/restore.go @@ -145,6 +145,11 @@ func checkDatabaseExist(proxy pkg.IShardingSphereProxy, bak *model.LsBackup) err databaseNamesExist = append(databaseNamesExist, k) } } + + if len(databaseNamesExist) == 0 { + return nil + } + // get user input to confirm return getUserApproveInTerminal() } diff --git a/pitr/cli/internal/cmd/restore_test.go b/pitr/cli/internal/cmd/restore_test.go index f1cc01a5..297531e8 100644 --- a/pitr/cli/internal/cmd/restore_test.go +++ b/pitr/cli/internal/cmd/restore_test.go @@ -19,6 +19,8 @@ package cmd import ( + "reflect" + "bou.ke/monkey" "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg" mock_pkg "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/mocks" @@ -60,9 +62,9 @@ var _ = Describe("Restore", func() { var _ = Describe("test restore", func() { var ( proxy *mock_pkg.MockIShardingSphereProxy - //ls *mock_pkg.MockILocalStorage - as *mock_pkg.MockIAgentServer - bak = &model.LsBackup{ + ls *mock_pkg.MockILocalStorage + as *mock_pkg.MockIAgentServer + bak = &model.LsBackup{ Info: nil, DnList: nil, SsBackup: &model.SsBackup{ @@ -87,19 +89,27 @@ var _ = Describe("test restore", func() { Remark: "", } ) + BeforeEach(func() { ctrl = gomock.NewController(GinkgoT()) proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl) as = mock_pkg.NewMockIAgentServer(ctrl) - //ls = mock_pkg.NewMockILocalStorage(ctrl) - monkey.Patch(getUserApproveInTerminal, func() error { return nil }) + ls = mock_pkg.NewMockILocalStorage(ctrl) + monkey.Patch(pkg.NewShardingSphereProxy, func(user, password, database, host string, port uint16) (pkg.IShardingSphereProxy, error) { + return proxy, nil + }) + monkey.Patch(pkg.NewLocalStorage, func(rootDir string) (pkg.ILocalStorage, error) { + return ls, nil + }) }) + AfterEach(func() { ctrl.Finish() monkey.UnpatchAll() }) It("check database if exists", func() { + monkey.Patch(getUserApproveInTerminal, func() error { return nil }) proxy.EXPECT().ExportMetaData() Expect(checkDatabaseExist(proxy, bak)).To(BeNil()) }) @@ -112,6 +122,29 @@ var _ = Describe("test restore", func() { Expect(<-failedCh).To(BeNil()) }) + It("test exec restore main func", func() { + // patch ReadByID of mock ls + monkey.PatchInstanceMethod(reflect.TypeOf(ls), "ReadByID", func(_ *mock_pkg.MockILocalStorage, _ string) (*model.LsBackup, error) { + return bak, nil + }) + // mock ExportMetaData and return a *ClusterInfo with bak in it + proxy.EXPECT().ExportMetaData().Return(bak.SsBackup.ClusterInfo, nil) + // mock ImportMetaData and return nil + proxy.EXPECT().ImportMetaData(gomock.Any()).Return(nil) + RecordID = "backup-id" + Expect(restore()).To(BeNil()) + }) + + // test getUserApproveInTerminal + Context("test userApproveInTerminal", func() { + // test user abort + It("user abort", func() { + // exec getUserApproveInTerminal + Expect(getUserApproveInTerminal()).To(Equal(xerr.NewCliErr("User abort"))) + }) + // TODO test user approve, how to patch os.Stdin? + }) + Context("restore data to ss proxy", func() { It("no need to drop database", func() { diff --git a/pitr/cli/internal/cmd/root.go b/pitr/cli/internal/cmd/root.go index 2485cfc8..7cf26482 100644 --- a/pitr/cli/internal/cmd/root.go +++ b/pitr/cli/internal/cmd/root.go @@ -18,6 +18,7 @@ package cmd import ( + "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model" "github.com/spf13/cobra" ) @@ -34,8 +35,10 @@ var ( AgentPort uint16 // BackupPath openGauss data backup path BackupPath string + // BackupModeStr openGauss data backup mode string (FULL or PTRACK) + BackupModeStr string // BackupMode openGauss data backup mode (FULL or PTRACK) - BackupMode string + BackupMode model.DBBackupMode // ThreadsNum openGauss data backup task thread num ThreadsNum uint8 // CSN openGauss data backup commit sequence number diff --git a/pitr/cli/internal/pkg/local-storage.go b/pitr/cli/internal/pkg/local-storage.go index b4367403..c7b70291 100644 --- a/pitr/cli/internal/pkg/local-storage.go +++ b/pitr/cli/internal/pkg/local-storage.go @@ -66,7 +66,7 @@ func NewLocalStorage(root string) (ILocalStorage, error) { } func DefaultRootDir() string { - return fmt.Sprintf("%s/%s", os.Getenv("HOME"), ".pitr") + return fmt.Sprintf("%s/%s", os.Getenv("HOME"), ".gs_pitr") } func (ls *localStorage) init() error { diff --git a/pitr/cli/internal/pkg/model/ls_backup.go b/pitr/cli/internal/pkg/model/ls_backup.go index 962007cf..3debe80e 100644 --- a/pitr/cli/internal/pkg/model/ls_backup.go +++ b/pitr/cli/internal/pkg/model/ls_backup.go @@ -26,10 +26,11 @@ type ( } BackupMetaInfo struct { - ID string `json:"id"` - CSN string `json:"csn"` - StartTime int64 `json:"start_time"` // Unix time - EndTime int64 `json:"end_time"` // Unix time + ID string `json:"id"` + CSN string `json:"csn"` + BackupMode DBBackupMode `json:"backup_mode"` + StartTime int64 `json:"start_time"` // Unix time + EndTime int64 `json:"end_time"` // Unix time } DataNode struct { @@ -59,10 +60,6 @@ type ( } StorageNodesInfo struct { - StorageNodes *StorageNodes `json:"storage_nodes"` - } - - StorageNodes struct { - List []*StorageNode `json:"sharding_db"` + StorageNodes map[string][]*StorageNode `json:"storage_nodes"` } ) diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy.go b/pitr/cli/internal/pkg/shardingsphere-proxy.go index ab85c9c9..f7803dfe 100644 --- a/pitr/cli/internal/pkg/shardingsphere-proxy.go +++ b/pitr/cli/internal/pkg/shardingsphere-proxy.go @@ -127,11 +127,11 @@ func (ss *shardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) { /* ExportStorageNodes 导出存储节点数据 -+-----------------------------+-------------------------+----------------------------------------+ -| id | create_time | data | -+-------------------------------------------------------+----------------------------------------+ -| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":{"sharding_db":[]}} | -+-------------------------------------------------------+----------------------------------------+ ++-----------------------------+-------------------------+--------------------------------------------+ +| id | create_time | data | ++-------------------------------------------------------+--------------------------------------------+ +| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":{"xx_db":[],"xx2_db":[]}} | ++-------------------------------------------------------+--------------------------------------------+ */ func (ss *shardingSphereProxy) ExportStorageNodes() ([]*model.StorageNode, error) { query, err := ss.db.Query(`EXPORT STORAGE NODES;`) @@ -156,7 +156,22 @@ func (ss *shardingSphereProxy) ExportStorageNodes() ([]*model.StorageNode, error if err = json.Unmarshal([]byte(data), &out); err != nil { return nil, fmt.Errorf("json unmarshal return err=%s", err) } - return out.StorageNodes.List, nil + + // get all storage nodes and filter duplicate nodes + var storageNodes []*model.StorageNode + var tmpNodesMap = make(map[string]struct{}) + for _, v := range out.StorageNodes { + for _, vv := range v { + // filter duplicate nodes + if _, ok := tmpNodesMap[fmt.Sprintf("%s:%d", vv.IP, vv.Port)]; ok { + continue + } + tmpNodesMap[fmt.Sprintf("%s:%d", vv.IP, vv.Port)] = struct{}{} + storageNodes = append(storageNodes, vv) + } + } + + return storageNodes, nil } // ImportMetaData 备份数据恢复 diff --git a/pitr/cli/pkg/gsutil/conn_test.go b/pitr/cli/pkg/gsutil/conn_test.go index c0a15a1d..193dd427 100644 --- a/pitr/cli/pkg/gsutil/conn_test.go +++ b/pitr/cli/pkg/gsutil/conn_test.go @@ -18,20 +18,41 @@ package gsutil import ( + "database/sql" + + "bou.ke/monkey" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) var _ = Describe("OpenGauss", func() { Context("Connection", func() { + It("empty user", func() { + og, err := Open("", "root", "postgres", "127.0.0.1", uint16(13308)) + Expect(err.Error()).To(Equal("user is empty")) + Expect(og).To(BeNil()) + }) + + It("empty password", func() { + og, err := Open("root", "", "postgres", "127.0.0.1", uint16(13308)) + Expect(err.Error()).To(Equal("password is empty")) + Expect(og).To(BeNil()) + }) + + It("empty database", func() { + og, err := Open("root", "root", "", "127.0.0.1", uint16(13308)) + Expect(err.Error()).To(Equal("db name is empty")) + Expect(og).To(BeNil()) + }) + It("Open and ping", func() { - Skip("Manually exec:dependent environment") + monkey.Patch(sql.Open, func(driverName, dataSourceName string) (*sql.DB, error) { + return &sql.DB{}, nil + }) + defer monkey.UnpatchAll() og, err := Open("root", "root", "postgres", "127.0.0.1", uint16(13308)) Expect(err).To(BeNil()) Expect(og).NotTo(BeNil()) - - err = og.Ping() - Expect(err).To(BeNil()) }) }) })