Skip to content

Commit

Permalink
Support concurrent backup and restore
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 10, 2023
1 parent 0691b27 commit 7cefef7
Show file tree
Hide file tree
Showing 14 changed files with 897 additions and 286 deletions.
7 changes: 6 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,6 +35,7 @@ var createBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)

start := time.Now().Unix()
var collectionNameArr []string
if collectionNames == "" {
collectionNameArr = []string{}
Expand All @@ -60,7 +62,10 @@ var createBackupCmd = &cobra.Command{
DbCollections: utils.WrapDBCollections(dbCollections),
Force: force,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())

fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var deleteBackupCmd = &cobra.Command{
BackupName: deleteBackName,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
},
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,6 +40,7 @@ var restoreBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)
log.Info("restore cmd input args", zap.Strings("args", args))
start := time.Now().Unix()
var collectionNameArr []string
if restoreCollectionNames == "" {
collectionNameArr = []string{}
Expand Down Expand Up @@ -84,7 +86,9 @@ var restoreBackupCmd = &cobra.Command{
RestoreIndex: restoreIndex,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
14 changes: 13 additions & 1 deletion configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,16 @@ minio:
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

backup:
maxSegmentGroupSize: 2G
maxSegmentGroupSize: 2G
parallelism: 2 # collection level parallelism to backup
copydata:
# thread pool to copy data for each collection backup, default 100.
# which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time.
# reduce it if blocks your storage's network bandwidth
parallelism: 100

restore:
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 2
52 changes: 21 additions & 31 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

Expand All @@ -36,7 +37,7 @@ type BackupContext struct {
params paramtable.BackupParams

// milvus client
milvusClient *gomilvus.Client
milvusClient *MilvusClient

// data storage client
storageClient *storage.ChunkManager
Expand All @@ -50,7 +51,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

//copyWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -88,35 +89,9 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
}

func (b *BackupContext) Start() error {
// start milvus go SDK client
//milvusClient, err := CreateMilvusClient(b.ctx, b.params)
//if err != nil {
// log.Error("failed to initial milvus client", zap.Error(err))
// return err
//}
//b.milvusClient = milvusClient

// start milvus storage client
//minioClient, err := CreateStorageClient(b.ctx, b.params)
//if err != nil {
// log.Error("failed to initial storage client", zap.Error(err))
// return err
//}
//b.storageClient = minioClient

b.backupTasks = sync.Map{}
b.backupNameIdDict = sync.Map{}
b.restoreTasks = make(map[string]*backuppb.RestoreBackupTask)

// init worker pool
//wp, err := common.NewWorkerPool(b.ctx, WORKER_NUM, RPS)
//if err != nil {
// log.Error("failed to initial copy data woker pool", zap.Error(err))
// return err
//}
//b.copyWorkerPool = wp
//b.copyWorkerPool.Start()

b.started = true
return nil
}
Expand All @@ -141,16 +116,18 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B
}
}

func (b *BackupContext) getMilvusClient() gomilvus.Client {
func (b *BackupContext) getMilvusClient() *MilvusClient {
if b.milvusClient == nil {
milvusClient, err := CreateMilvusClient(b.ctx, b.params)
if err != nil {
log.Error("failed to initial milvus client", zap.Error(err))
panic(err)
}
b.milvusClient = &milvusClient
b.milvusClient = &MilvusClient{
client: milvusClient,
}
}
return *b.milvusClient
return b.milvusClient
}

func (b *BackupContext) getStorageClient() storage.ChunkManager {
Expand All @@ -165,6 +142,19 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

func (b *BackupContext) getRestoreWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
b.bulkinsertWorkerPool.Start()
}
return b.bulkinsertWorkerPool
}

func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse {
if request.GetRequestId() == "" {
request.RequestId = utils.UUID()
Expand Down
Loading

0 comments on commit 7cefef7

Please sign in to comment.