Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Oct 10, 2023
1 parent 4e22d90 commit a2ff91d
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 104 deletions.
7 changes: 6 additions & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -34,6 +35,7 @@ var createBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)

start := time.Now().Unix()
var collectionNameArr []string
if collectionNames == "" {
collectionNameArr = []string{}
Expand All @@ -60,7 +62,10 @@ var createBackupCmd = &cobra.Command{
DbCollections: utils.WrapDBCollections(dbCollections),
Force: force,
})
fmt.Println(resp.GetCode(), "\n", resp.GetMsg())

fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var deleteBackupCmd = &cobra.Command{
BackupName: deleteBackName,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
},
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -39,6 +40,7 @@ var restoreBackupCmd = &cobra.Command{
context := context.Background()
backupContext := core.CreateBackupContext(context, params)
log.Info("restore cmd input args", zap.Strings("args", args))
start := time.Now().Unix()
var collectionNameArr []string
if restoreCollectionNames == "" {
collectionNameArr = []string{}
Expand Down Expand Up @@ -84,7 +86,9 @@ var restoreBackupCmd = &cobra.Command{
RestoreIndex: restoreIndex,
})

fmt.Println(resp.GetCode(), "\n", resp.GetMsg())
fmt.Println(resp.GetMsg())
duration := time.Now().Unix() - start
fmt.Println(fmt.Sprintf("duration:%d s", duration))
},
}

Expand Down
27 changes: 14 additions & 13 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/zilliztech/milvus-backup/core/proto/backuppb"
"github.com/zilliztech/milvus-backup/core/storage"
"github.com/zilliztech/milvus-backup/core/utils"
"github.com/zilliztech/milvus-backup/internal/common"
"github.com/zilliztech/milvus-backup/internal/log"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ type BackupContext struct {

restoreTasks map[string]*backuppb.RestoreBackupTask

//backupWorkerPool *common.WorkerPool
bulkinsertWorkerPool *common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -141,18 +142,18 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager {
return *b.storageClient
}

//func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool {
// if b.backupWorkerPool == nil {
// wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS)
// if err != nil {
// log.Error("failed to initial copy data woker pool", zap.Error(err))
// panic(err)
// }
// b.backupWorkerPool = wp
// b.backupWorkerPool.Start()
// }
// return b.backupWorkerPool
//}
func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool {
if b.bulkinsertWorkerPool == nil {
wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS)
if err != nil {
log.Error("failed to initial copy data woker pool", zap.Error(err))
panic(err)
}
b.bulkinsertWorkerPool = wp
b.bulkinsertWorkerPool.Start()
}
return b.bulkinsertWorkerPool
}

func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse {
if request.GetRequestId() == "" {
Expand Down
186 changes: 107 additions & 79 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,68 +462,119 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
}
}()

jobIds := make([]int64, 0)
for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() {
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
partitionBackup2 := partitionBackup
job := func(ctx context.Context) error {
log.Info("start restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()))
_, err := b.restorePartition(ctx, targetDBName, targetCollectionName, partitionBackup2, task, isSameBucket, backupBucketName, backupPath, tempDir)
if err != nil {
log.Error("fail to create partition", zap.Error(err))
return task, err
log.Error("fail to restore partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup2.GetPartitionName()),
zap.Error(err))
return err
}
return err
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
jobId := b.getBackupWorkerPool().SubmitWithId(job)
jobIds = append(jobIds, jobId)
}

err = b.getBackupWorkerPool().WaitJobs(jobIds)
return task, err
}

func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string,
partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) {
exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
if err != nil {
log.Error("fail to check has partition", zap.Error(err))
return task, err
}
if !exist {
err = retry.Do(ctx, func() error {
return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName())
}, retry.Attempts(10), retry.Sleep(1*time.Second))
if err != nil {
log.Error("fail to create partition", zap.Error(err))
return task, err
}
}
log.Info("create partition",
zap.String("collectionName", targetCollectionName),
zap.String("partitionName", partitionBackup.GetPartitionName()))

// bulk insert
copyAndBulkInsert := func(files []string) error {
realFiles := make([]string, len(files))
// if milvus bucket and backup bucket are not the same, should copy the data first
if !isSameBucket {
log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files))
for i, file := range files {
// empty delta file, no need to copy
if file == "" {
realFiles[i] = file
} else {
err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file)
if err != nil {
log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err))
return err
}
realFiles[i] = tempDir + file
}
} else {
realFiles = files
}
} else {
realFiles = files
}

err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if err != nil {
log.Error("fail to bulk insert to partition",
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
}
return nil
}

err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp))
if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
if err != nil {
log.Error("fail to bulk insert to partition",
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetDBName", targetDBName),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()),
zap.Error(err))
return err
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
return nil
}

if task.GetMetaOnly() {
task.Progress = 100
} else {
groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups())
if len(groupIds) == 1 && groupIds[0] == 0 {
// backward compatible old backup without group id
files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup)
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
Expand All @@ -541,39 +592,16 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
} else {
// bulk insert by segment groups
for _, groupId := range groupIds {
files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId)
if err != nil {
log.Error("fail to get partition backup binlog files",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
err = copyAndBulkInsert(files)
if err != nil {
log.Error("fail to (copy and) bulkinsert data",
zap.Error(err),
zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()),
zap.String("targetCollectionName", targetCollectionName),
zap.String("partition", partitionBackup.GetPartitionName()))
return task, err
}
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}
task.RestoredSize = task.RestoredSize + partitionBackup.GetSize()
if task.ToRestoreSize == 0 {
task.Progress = 100
} else {
task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize)
}
}

return task, err
return task, nil
}

func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 {
Expand Down
Loading

0 comments on commit a2ff91d

Please sign in to comment.