Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Oct 10, 2023
1 parent 4e22d90 commit 41fd4ec
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 109 deletions.
7 changes: 6 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,6 +35,7 @@ var createBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)

start := time.Now().Unix()
var collectionNameArr []string
if collectionNames == "" {
collectionNameArr = []string{}
Expand All @@ -60,7 +62,10 @@ var createBackupCmd = &cobra.Command{
DbCollections: utils.WrapDBCollections(dbCollections),
Force: force,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())

fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var deleteBackupCmd = &cobra.Command{
BackupName: deleteBackName,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
},
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,6 +40,7 @@ var restoreBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)
log.Info("restore cmd input args", zap.Strings("args", args))
start := time.Now().Unix()
var collectionNameArr []string
if restoreCollectionNames == "" {
collectionNameArr = []string{}
Expand Down Expand Up @@ -84,7 +86,9 @@ var restoreBackupCmd = &cobra.Command{
RestoreIndex: restoreIndex,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
6 changes: 3 additions & 3 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ minio:

backup:
maxSegmentGroupSize: 2G
parallelism: 1 # collection level parallelism to backup, default 1
parallelism: 2 # collection level parallelism to backup
copydata:
# thread pool to copy data for each collection backup, default 100.
# which means if you set backup.parallelism = 2 backup.copydata.parallelism = 100, there will be 200 copy executing at the same time.
# reduce it if blocks your storage's network bandwidth
parallelism: 100

restore:
# Collection level parallelism to restore, default 1
# Collection level parallelism to restore
# Only change it > 1 when you have more than one datanode.
# Because the max parallelism of Milvus bulkinsert is equal to datanodes' number.
parallelism: 1
parallelism: 2
27 changes: 14 additions & 13 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

//backupWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -141,18 +142,18 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

//func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool {
// if b.backupWorkerPool == nil {
// wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS)
// if err != nil {
// log.Error("failed to initial copy data woker pool", zap.Error(err))
// panic(err)
// }
// b.backupWorkerPool = wp
// b.backupWorkerPool.Start()
// }
// return b.backupWorkerPool
//}
func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.RestoreParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
b.bulkinsertWorkerPool.Start()
}
return b.bulkinsertWorkerPool
}

func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse {
if request.GetRequestId() == "" {
Expand Down
186 changes: 107 additions & 79 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,68 +462,119 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}
}()

jobIds := make([]int64, 0)
for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() {
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
partitionBackup2 := partitionBackup
job := func(ctx context.Context) error {
log.Info("start restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()))
_, err := b.restorePartition(ctx, targetDBName, targetCollectionName, partitionBackup2, task, isSameBucket, backupBucketName, backupPath, tempDir)
if err != nil {
log.Error("fail to create partition", zap.Error(err))
return task, err
log.Error("fail to restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()),
zap.Error(err))
return err
}
return err
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
jobId := b.getBackupWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}

err = b.getBackupWorkerPool().WaitJobs(jobIds)
return task, err
}

func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string,
partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) {
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
if err != nil {
log.Error("fail to create partition", zap.Error(err))
return task, err
}
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
} else {
realFiles = files
}
} else {
realFiles = files
}

err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
}
return nil
}

err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to bulk insert to partition",
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
return nil
}

if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -541,39 +592,16 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}

return task, err
return task, nil
}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
Expand Down
11 changes: 9 additions & 2 deletions core/milvus_sdk_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
gomilvus "github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/zilliztech/milvus-backup/internal/util/retry"
"sync"
"time"
)

// MilvusClient wrap db into milvus API to make it thread safe
Expand Down Expand Up @@ -130,7 +132,10 @@ func (m *MilvusClient) CreateCollection(ctx context.Context, db string, schema *
if err != nil {
return err
}
return m.client.CreateCollection(ctx, schema, shardsNum, opts...)
// add retry to make sure won't be block by rate control
return retry.Do(ctx, func() error {
return m.client.CreateCollection(ctx, schema, shardsNum, opts...)
}, retry.Sleep(2*time.Second), retry.Attempts(10))
}

func (m *MilvusClient) CreatePartition(ctx context.Context, db, collName string, partitionName string) error {
Expand All @@ -140,7 +145,9 @@ func (m *MilvusClient) CreatePartition(ctx context.Context, db, collName string,
if err != nil {
return err
}
return m.client.CreatePartition(ctx, collName, partitionName)
return retry.Do(ctx, func() error {
return m.client.CreatePartition(ctx, collName, partitionName)
}, retry.Sleep(2*time.Second), retry.Attempts(10))
}

func (m *MilvusClient) HasPartition(ctx context.Context, db, collName string, partitionName string) (bool, error) {
Expand Down
Loading

0 comments on commit 41fd4ec

Please sign in to comment.