Skip to content

Commit

Permalink
bugfix(pitr): fix storage nodes info struct error, add endtime to bac…
Browse files Browse the repository at this point in the history
…kup file, and fix backup mode ptrack did not work. (#288)
  • Loading branch information
Xu-Wentao authored Mar 29, 2023
1 parent 8ac7dbc commit 91ccfa0
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 50 deletions.
8 changes: 3 additions & 5 deletions pitr/agent/internal/handler/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ package handler
import (
"errors"
"fmt"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"

"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view"

"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
"github.com/gofiber/fiber/v2"

"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons"
)

func Backup(ctx *fiber.Ctx) error {
Expand Down
2 changes: 2 additions & 0 deletions pitr/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ go 1.20
require (
bou.ke/monkey v1.0.2
gitee.com/opengauss/openGauss-connector-go-pq v1.0.4
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/jarcoal/httpmock v1.3.0
github.com/onsi/ginkgo/v2 v2.9.1
github.com/onsi/gomega v1.27.3
github.com/spf13/cobra v1.6.1
Expand Down
5 changes: 5 additions & 0 deletions pitr/cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
gitee.com/opengauss/openGauss-connector-go-pq v1.0.4 h1:npfLM9/QpkmdK+XY9X2pcC2EX5gosyn/6dRDRd2sEJs=
gitee.com/opengauss/openGauss-connector-go-pq v1.0.4/go.mod h1:2UEp+ug6ls6C0pLfZgBn7VBzBntFUzxJuy+6FlQ7qyI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -52,6 +54,8 @@ github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc=
github.com/jarcoal/httpmock v1.3.0/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand All @@ -60,6 +64,7 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g=
github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk=
github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo=
github.com/onsi/gomega v1.27.3 h1:5VwIwnBY3vbBDOJrNtA4rVdiTZCsq9B5F12pvy1Drmk=
Expand Down
69 changes: 50 additions & 19 deletions pitr/cli/internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ var BackupCmd = &cobra.Command{
fmt.Printf("Flag: %s Value: %s\n", flag.Name, flag.Value)
})

// convert BackupModeStr to BackupMode
switch BackupModeStr {
case "FULL", "full":
BackupMode = model.BDBackModeFull
case "PTRACK", "ptrack":
BackupMode = model.DBBackModePTrack
}
if BackupMode == model.DBBackModePTrack {
logging.Warn("Please make sure all openGauss nodes have been set correct configuration about ptrack. You can refer to https://support.huaweicloud.com/intl/zh-cn/devg-opengauss/opengauss_devg_1362.html for more details.")
}

logging.Info(fmt.Sprintf("Default backup path: %s", pkg.DefaultRootDir()))

// Start backup
Expand All @@ -71,7 +82,7 @@ func init() {
_ = BackupCmd.MarkFlagRequired("password")
BackupCmd.Flags().StringVarP(&BackupPath, "dn-backup-path", "B", "", "openGauss data backup path")
_ = BackupCmd.MarkFlagRequired("dn-backup-path")
BackupCmd.Flags().StringVarP(&BackupMode, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)")
BackupCmd.Flags().StringVarP(&BackupModeStr, "dn-backup-mode", "b", "", "openGauss data backup mode (FULL|PTRACK)")
_ = BackupCmd.MarkFlagRequired("dn-backup-mode")
BackupCmd.Flags().Uint8VarP(&ThreadsNum, "dn-threads-num", "j", 1, "openGauss data backup threads nums")
BackupCmd.Flags().Uint16VarP(&AgentPort, "agent-port", "a", 443, "agent server port")
Expand All @@ -88,6 +99,7 @@ func init() {
// 6. Update local backup info
// 7. Double check backups all finished
func backup() error {
var err error
proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDbName, Host, Port)
if err != nil {
return xerr.NewCliErr("create ss-proxy connect failed")
Expand All @@ -98,48 +110,65 @@ func backup() error {
return xerr.NewCliErr("create local storage failed")
}

defer func() {
if err != nil {
logging.Info("try to unlock cluster ...")
if err := proxy.Unlock(); err != nil {
logging.Error(fmt.Sprintf("coz backup failed, try to unlock cluster, but still failed, err:%s", err.Error()))
}
}
}()

// Step1. lock cluster
if err := proxy.LockForBackup(); err != nil {
logging.Info("Starting lock cluster ...")
err = proxy.LockForBackup()
if err != nil {
return xerr.NewCliErr("lock for backup failed")
}

// Step2. Get cluster info and save local backup info
logging.Info("Starting export metadata ...")
lsBackup, err := exportData(proxy, ls)
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("export backup data failed, err:%s", err.Error()))
}

logging.Info(fmt.Sprintf("export backup data success, backup filename: %s", filename))
logging.Info(fmt.Sprintf("Export backup data success, backup filename: %s", filename))

// Step3. send backup command to agent-server.
if err := execBackup(lsBackup); err != nil {
// if backup failed, still need to unlock cluster.
if err := proxy.Unlock(); err != nil {
logging.Error(fmt.Sprintf("coz exec backup failed, try to unlock cluster, but still failed, err:%s", err.Error()))
}
logging.Info("Starting backup ...")
err = execBackup(lsBackup)
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("exec backup failed, err:%s", err.Error()))
}

// Step4. unlock cluster
if err := proxy.Unlock(); err != nil {
logging.Info("Starting unlock cluster ...")
err = proxy.Unlock()
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("unlock cluster failed, err:%s", err.Error()))
}

// Step5. update backup file
if err := ls.WriteByJSON(filename, lsBackup); err != nil {
logging.Info("Starting update backup file ...")
err = ls.WriteByJSON(filename, lsBackup)
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err:%s", err.Error()))
}

// Step6. check agent server backup status
// Step6. check agent server backup
logging.Info("Starting check backup status ...")
status := checkBackupStatus(lsBackup)
logging.Info(fmt.Sprintf("backup result:%s", status))
logging.Info(fmt.Sprintf("Backup result: %s", status))

// Step7. finished backup and update backup file
if err := ls.WriteByJSON(filename, lsBackup); err != nil {
logging.Info("Starting update backup file ...")
err = ls.WriteByJSON(filename, lsBackup)
if err != nil {
return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err: %s", err.Error()))
}

logging.Info("backup finished")
logging.Info("Backup finished!")
return nil

}
Expand All @@ -166,10 +195,11 @@ func exportData(proxy pkg.IShardingSphereProxy, ls pkg.ILocalStorage) (lsBackup

contents := &model.LsBackup{
Info: &model.BackupMetaInfo{
ID: uuid.New().String(), // generate uuid for this backup
CSN: csn,
StartTime: time.Now().Unix(),
EndTime: 0,
ID: uuid.New().String(), // generate uuid for this backup
CSN: csn,
StartTime: time.Now().Unix(),
EndTime: 0,
BackupMode: BackupMode,
},
SsBackup: &model.SsBackup{
Status: model.SsBackupStatusWaiting, // default status of backup is model.SsBackupStatusWaiting
Expand Down Expand Up @@ -231,7 +261,7 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.
Password: node.Password,
DnBackupPath: BackupPath,
DnThreadsNum: ThreadsNum,
DnBackupMode: model.BDBackModeFull,
DnBackupMode: BackupMode,
Instance: defaultInstance,
}
backupID, err := as.Backup(in)
Expand Down Expand Up @@ -297,6 +327,7 @@ func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
}

lsBackup.SsBackup.Status = backupFinalStatus
lsBackup.Info.EndTime = time.Now().Unix()
return backupFinalStatus
}

Expand Down
1 change: 1 addition & 0 deletions pitr/cli/internal/cmd/cmd_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func initLog() {
}
logging.Init(logger)
}

func TestCmd(t *testing.T) {
initLog()
RegisterFailHandler(Fail)
Expand Down
5 changes: 5 additions & 0 deletions pitr/cli/internal/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ func checkDatabaseExist(proxy pkg.IShardingSphereProxy, bak *model.LsBackup) err
databaseNamesExist = append(databaseNamesExist, k)
}
}

if len(databaseNamesExist) == 0 {
return nil
}

// get user input to confirm
return getUserApproveInTerminal()
}
Expand Down
43 changes: 38 additions & 5 deletions pitr/cli/internal/cmd/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package cmd

import (
"reflect"

"bou.ke/monkey"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
mock_pkg "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/mocks"
Expand Down Expand Up @@ -60,9 +62,9 @@ var _ = Describe("Restore", func() {
var _ = Describe("test restore", func() {
var (
proxy *mock_pkg.MockIShardingSphereProxy
//ls *mock_pkg.MockILocalStorage
as *mock_pkg.MockIAgentServer
bak = &model.LsBackup{
ls *mock_pkg.MockILocalStorage
as *mock_pkg.MockIAgentServer
bak = &model.LsBackup{
Info: nil,
DnList: nil,
SsBackup: &model.SsBackup{
Expand All @@ -87,19 +89,27 @@ var _ = Describe("test restore", func() {
Remark: "",
}
)

BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl)
as = mock_pkg.NewMockIAgentServer(ctrl)
//ls = mock_pkg.NewMockILocalStorage(ctrl)
monkey.Patch(getUserApproveInTerminal, func() error { return nil })
ls = mock_pkg.NewMockILocalStorage(ctrl)
monkey.Patch(pkg.NewShardingSphereProxy, func(user, password, database, host string, port uint16) (pkg.IShardingSphereProxy, error) {
return proxy, nil
})
monkey.Patch(pkg.NewLocalStorage, func(rootDir string) (pkg.ILocalStorage, error) {
return ls, nil
})
})

AfterEach(func() {
ctrl.Finish()
monkey.UnpatchAll()
})

It("check database if exists", func() {
monkey.Patch(getUserApproveInTerminal, func() error { return nil })
proxy.EXPECT().ExportMetaData()
Expect(checkDatabaseExist(proxy, bak)).To(BeNil())
})
Expand All @@ -112,6 +122,29 @@ var _ = Describe("test restore", func() {
Expect(<-failedCh).To(BeNil())
})

It("test exec restore main func", func() {
// patch ReadByID of mock ls
monkey.PatchInstanceMethod(reflect.TypeOf(ls), "ReadByID", func(_ *mock_pkg.MockILocalStorage, _ string) (*model.LsBackup, error) {
return bak, nil
})
// mock ExportMetaData and return a *ClusterInfo with bak in it
proxy.EXPECT().ExportMetaData().Return(bak.SsBackup.ClusterInfo, nil)
// mock ImportMetaData and return nil
proxy.EXPECT().ImportMetaData(gomock.Any()).Return(nil)
RecordID = "backup-id"
Expect(restore()).To(BeNil())
})

// test getUserApproveInTerminal
Context("test userApproveInTerminal", func() {
// test user abort
It("user abort", func() {
// exec getUserApproveInTerminal
Expect(getUserApproveInTerminal()).To(Equal(xerr.NewCliErr("User abort")))
})
// TODO test user approve, how to patch os.Stdin?
})

Context("restore data to ss proxy", func() {

It("no need to drop database", func() {
Expand Down
5 changes: 4 additions & 1 deletion pitr/cli/internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cmd

import (
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
"github.com/spf13/cobra"
)

Expand All @@ -34,8 +35,10 @@ var (
AgentPort uint16
// BackupPath openGauss data backup path
BackupPath string
// BackupModeStr openGauss data backup mode string (FULL or PTRACK)
BackupModeStr string
// BackupMode openGauss data backup mode (FULL or PTRACK)
BackupMode string
BackupMode model.DBBackupMode
// ThreadsNum openGauss data backup task thread num
ThreadsNum uint8
// CSN openGauss data backup commit sequence number
Expand Down
2 changes: 1 addition & 1 deletion pitr/cli/internal/pkg/local-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewLocalStorage(root string) (ILocalStorage, error) {
}

func DefaultRootDir() string {
return fmt.Sprintf("%s/%s", os.Getenv("HOME"), ".pitr")
return fmt.Sprintf("%s/%s", os.Getenv("HOME"), ".gs_pitr")
}

func (ls *localStorage) init() error {
Expand Down
15 changes: 6 additions & 9 deletions pitr/cli/internal/pkg/model/ls_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ type (
}

BackupMetaInfo struct {
ID string `json:"id"`
CSN string `json:"csn"`
StartTime int64 `json:"start_time"` // Unix time
EndTime int64 `json:"end_time"` // Unix time
ID string `json:"id"`
CSN string `json:"csn"`
BackupMode DBBackupMode `json:"backup_mode"`
StartTime int64 `json:"start_time"` // Unix time
EndTime int64 `json:"end_time"` // Unix time
}

DataNode struct {
Expand Down Expand Up @@ -59,10 +60,6 @@ type (
}

StorageNodesInfo struct {
StorageNodes *StorageNodes `json:"storage_nodes"`
}

StorageNodes struct {
List []*StorageNode `json:"sharding_db"`
StorageNodes map[string][]*StorageNode `json:"storage_nodes"`
}
)
Loading

0 comments on commit 91ccfa0

Please sign in to comment.