Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine meta backup and restore process of local storage #8

Merged
merged 14 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewBackupCmd() *cobra.Command {
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxSSHConnections, "connection", 5, "max ssh connection")
backupCmd.PersistentFlags().IntVar(&backupConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
backupCmd.PersistentFlags().StringVar(&backupConfig.CommandArgs, "extra_args", "", "backup storage utils(oss/hdfs/s3) args for backup")
backupCmd.PersistentFlags().BoolVar(&backupConfig.Verbose, "verbose", false, "show backup detailed informations")

backupCmd.MarkPersistentFlagRequired("meta")
backupCmd.MarkPersistentFlagRequired("storage")
Expand Down Expand Up @@ -54,13 +55,21 @@ func newFullBackupCmd() *cobra.Command {
return err
}
defer logger.Sync() // flushes buffer, if any
b := backup.NewBackupClient(backupConfig, logger.Logger)
var b *backup.Backup
b, err = backup.NewBackupClient(backupConfig, logger.Logger)
if err != nil {
return err
}

fmt.Println("start to backup cluster...")
err = b.BackupCluster()
if err != nil {
return err
}
fmt.Println("backup successed")
fmt.Println("backup successed.")
if backupConfig.Verbose {
b.ShowSummaries()
}
return nil
},
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func NewRestoreCMD() *cobra.Command {
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackendUrl, "storage", "", "storage path")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.User, "user", "", "user for meta and storage")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.BackupName, "name", "", "backup name")
restoreCmd.PersistentFlags().BoolVar(&restoreConfig.AllowStandaloneMeta, "allow_standalone_meta", false, "if the target cluster with standalone meta service is allowed(for testing purpose)")
restoreCmd.PersistentFlags().IntVar(&restoreConfig.MaxConcurrent, "concurrent", 5, "max concurrent(for aliyun OSS)")
restoreCmd.PersistentFlags().StringVar(&restoreConfig.CommandArgs, "extra_args", "", "storage utils(oss/hdfs/s3) args for restore")

Expand Down Expand Up @@ -55,7 +54,12 @@ func newFullRestoreCmd() *cobra.Command {

defer logger.Sync() // flushes buffer, if any

r := restore.NewRestore(restoreConfig, logger.Logger)
var r *restore.Restore
r, err = restore.NewRestore(restoreConfig, logger.Logger)
if err != nil {
return err
}

err = r.RestoreCluster()
if err != nil {
return err
Expand Down
111 changes: 105 additions & 6 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backup

import (
"encoding/json"
"errors"
"fmt"
_ "os"
Expand All @@ -16,6 +17,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/vesoft-inc/nebula-br/pkg/config"
ctx0 "github.com/vesoft-inc/nebula-br/pkg/context"
"github.com/vesoft-inc/nebula-br/pkg/metaclient"
"github.com/vesoft-inc/nebula-br/pkg/remote"
"github.com/vesoft-inc/nebula-br/pkg/storage"
Expand Down Expand Up @@ -44,21 +46,50 @@ func (e *BackupError) Error() string {
return e.msg + e.Err.Error()
}

type backupEntry struct {
SrcPath string
DestUrl string
}
type idPathMap map[string][]backupEntry
type Backup struct {
config config.BackupConfig
metaLeader string
backendStorage storage.ExternalStorage
log *zap.Logger
metaFileName string
storageMap map[string]idPathMap
metaMap map[string]idPathMap
storeCtx *ctx0.Context
}

func NewBackupClient(cf config.BackupConfig, log *zap.Logger) *Backup {
backend, err := storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs)
func NewBackupClient(cf config.BackupConfig, log *zap.Logger) (*Backup, error) {
local_addr, err := remote.GetAddresstoReachRemote(strings.Split(cf.Meta, ":")[0], cf.User, log)
if err != nil {
log.Error("get local address failed", zap.Error(err))
return nil, err
}
log.Info("local address", zap.String("address", local_addr))
var (
storeCtx ctx0.Context
backend storage.ExternalStorage
)
backend, err = storage.NewExternalStorage(cf.BackendUrl, log, cf.MaxConcurrent, cf.CommandArgs,
&storeCtx)
if err != nil {
log.Error("new external storage failed", zap.Error(err))
return nil
return nil, err
}
return &Backup{config: cf, backendStorage: backend, log: log}

b := &Backup{config: cf, log: log,
storageMap: make(map[string]idPathMap),
metaMap: make(map[string]idPathMap),
storeCtx: &storeCtx}

b.storeCtx.LocalAddr = local_addr
b.storeCtx.Reporter = b
b.backendStorage = backend

return b, nil
}

func (b *Backup) dropBackup(name []byte) (*meta.ExecResp, error) {
Expand Down Expand Up @@ -149,6 +180,9 @@ func (b *Backup) BackupCluster() error {
}

meta := resp.GetMeta()
b.log.Info("response backup meta",
zap.String("backup.meta", metaclient.BackupMetaToString(meta)))

err = b.uploadAll(meta)
if err != nil {
return err
Expand All @@ -157,12 +191,27 @@ func (b *Backup) BackupCluster() error {
return nil
}

func (b *Backup) execPreUploadMetaCommand(metaDir string) error {
cmdStr := []string{"mkdir", "-p", metaDir}
b.log.Info("exec pre upload meta command", zap.Strings("cmd", cmdStr))
cmd := exec.Command(cmdStr[0], cmdStr[1:]...)
err := cmd.Run()
if err != nil {
return err
}
cmd.Wait()
return nil
}

func (b *Backup) uploadMeta(g *errgroup.Group, files []string) {

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)
b.log.Info("start upload meta", zap.String("addr", b.metaLeader))
ipAddr := strings.Split(b.metaLeader, ":")
b.storeCtx.RemoteAddr = ipAddr[0]

b.log.Info("will upload meta", zap.Int("sst file count", len(files)))
cmd := b.backendStorage.BackupMetaCommand(files)

func(addr string, user string, cmd string, log *zap.Logger) {
g.Go(func() error {
client, err := remote.NewClient(addr, user, log)
Expand Down Expand Up @@ -193,6 +242,9 @@ func (b *Backup) uploadStorage(g *errgroup.Group, dirs map[string][]spaceInfo) e
return err
}
i := 0

b.storeCtx.RemoteAddr = ipAddrs[0]

//We need to limit the number of ssh connections per storage node
for id2, cp := range idMap {
cmds := b.backendStorage.BackupStorageCommand(cp, k, id2)
Expand Down Expand Up @@ -254,6 +306,12 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
return err
}

err = b.execPreUploadMetaCommand(b.backendStorage.BackupMetaDir())
if err != nil {
b.log.Error("exec pre uploadmeta command failed", zap.Error(err))
return err
}

var metaFiles []string
for _, f := range meta.GetMetaFiles() {
fileName := string(f[:])
Expand All @@ -271,6 +329,7 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {
}
}
}

err = b.uploadStorage(g, storageMap)
if err != nil {
return err
Expand Down Expand Up @@ -305,3 +364,43 @@ func (b *Backup) uploadAll(meta *meta.BackupMeta) error {

return nil
}

func (b *Backup) ShowSummaries() {
fmt.Printf("==== backup summeries ====\n")
fmt.Printf("localaddr : %s\n", b.storeCtx.LocalAddr)
fmt.Printf("backend.type : %s\n", b.backendStorage.Scheme())
fmt.Printf("backend.url : %s\n", b.backendStorage.URI())
fmt.Printf("tgt.meta.leader : %s\n", b.config.Meta)
if b.backendStorage.Scheme() == storage.SCHEME_LOCAL {
// if local, storages' snapshot would be copy to a path at that host.
b.showUploadSummaries(&b.metaMap, "tgt.meta.map")
b.showUploadSummaries(&b.storageMap, "tgt.storage.map")
}
fmt.Printf("==========================\n")
}

func (b *Backup) showUploadSummaries(m *map[string]idPathMap, msg string) {
o, _ := json.MarshalIndent(m, "", " ")
fmt.Printf("--- %s ---\n", msg)
fmt.Printf("%s\n", string(o))
}

func (b *Backup) doRecordUploading(m *map[string]idPathMap, spaceId string, host string, paths []string, desturl string) {
if (*m)[host] == nil {
(*m)[host] = make(idPathMap)
}
bes := []backupEntry{}
for _, p := range paths {
bes = append(bes, backupEntry{SrcPath: p, DestUrl: desturl})
}
(*m)[host][spaceId] = append((*m)[host][spaceId], bes[:]...)
}

func (b *Backup) StorageUploadingReport(spaceid string, host string, paths []string, desturl string) {
b.doRecordUploading(&b.storageMap, spaceid, host, paths, desturl)
}

func (b *Backup) MetaUploadingReport(host string, paths []string, desturl string) {
kDefaultSid := "0"
b.doRecordUploading(&b.metaMap, kDefaultSid, host, paths, desturl)
}
12 changes: 6 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type BackupConfig struct {
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Verbose bool
}

type RestoreConfig struct {
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
AllowStandaloneMeta bool
Meta string
BackendUrl string
MaxSSHConnections int
User string
BackupName string
// Only for OSS for now
MaxConcurrent int
CommandArgs string
Expand Down
21 changes: 21 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package context

import (
_ "github.com/vesoft-inc/nebula-go/v2/nebula"
)

type BackendUploadTracker interface {
StorageUploadingReport(spaceid string, host string, paths []string, desturl string)
MetaUploadingReport(host string, paths []string, desturl string)
}

// NB - not thread-safe
type Context struct {
LocalAddr string // the address of br client
RemoteAddr string // the address of nebula service
Reporter BackendUploadTracker
}

func NewContext(localaddr string, r BackendUploadTracker) *Context {
return &Context{LocalAddr: localaddr, Reporter: r}
}
18 changes: 18 additions & 0 deletions pkg/metaclient/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
package metaclient

import (
"encoding/json"
"strconv"

"github.com/vesoft-inc/nebula-go/v2/nebula"
"github.com/vesoft-inc/nebula-go/v2/nebula/meta"
)

func HostaddrToString(host *nebula.HostAddr) string {
return host.Host + ":" + strconv.Itoa(int(host.Port))
}

func BackupMetaToString(m *meta.BackupMeta) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}

func ListClusterInfoRespToString(m *meta.ListClusterInfoResp) string {
mstr, err := json.Marshal(m)
if err != nil {
return ""
}
return string(mstr)
}
8 changes: 8 additions & 0 deletions pkg/remote/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func NewClientPool(addr string, user string, log *zap.Logger, count int) ([]*Cli
return clients, nil
}

func GetAddresstoReachRemote(addr string, user string, log *zap.Logger) (string, error) {
if cli, err := NewClient(addr, user, log); err == nil {
return strings.Split(cli.client.Conn.LocalAddr().String(), ":")[0], nil
} else {
return "", err
}
}

func (c *Client) Close() {
c.client.Close()
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/remote/ssh_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package remote

import (
"flag"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
_ "golang.org/x/crypto/ssh"
)

var remoteAddr = flag.String("addr", "", "remote ssh addr for test")
var remoteUser = flag.String("user", "", "remote user for test")

func TestClient(t *testing.T) {
ast := assert.New(t)
if *remoteAddr == "" || *remoteUser == "" {
t.Log("addr and user should be provided!")
return
}

logger, _ := zap.NewProduction()
cli, err := NewClient(*remoteAddr, *remoteUser, logger)
ast.Nil(err)

t.Logf("ssh user: %s", cli.client.Conn.User())
t.Logf("local addr: %s, remote addr: %s",
cli.client.Conn.LocalAddr().String(),
cli.client.Conn.RemoteAddr().String())
}

func TestGetLocalAddress(t *testing.T) {
ast := assert.New(t)
if *remoteAddr == "" || *remoteUser == "" {
t.Log("addr and user should be provided!")
return
}
logger, _ := zap.NewProduction()
laddr, err := GetAddresstoReachRemote(*remoteAddr, *remoteUser, logger)
ast.Nil(err)

t.Logf("local addr: %s", laddr)
}
Loading