From a2ff91d35f1b29f13bf59dc422910cfe1aa754a4 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 10 Oct 2023 17:24:16 +0800 Subject: [PATCH] update --- cmd/create.go | 7 +- cmd/delete.go | 2 +- cmd/restore.go | 6 +- core/backup_context.go | 27 +++-- core/backup_impl_restore_backup.go | 186 +++++++++++++++++------------ internal/common/workerpool.go | 73 +++++++++-- internal/common/workerpool_test.go | 28 +++++ 7 files changed, 225 insertions(+), 104 deletions(-) diff --git a/cmd/create.go b/cmd/create.go index 7b8b81e3..313c92c4 100644 --- a/cmd/create.go +++ b/cmd/create.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" jsoniter "github.com/json-iterator/go" "github.com/spf13/cobra" @@ -34,6 +35,7 @@ var createBackupCmd = &cobra.Command{ context := context.Background() backupContext := core.CreateBackupContext(context, params) + start := time.Now().Unix() var collectionNameArr []string if collectionNames == "" { collectionNameArr = []string{} @@ -60,7 +62,10 @@ var createBackupCmd = &cobra.Command{ DbCollections: utils.WrapDBCollections(dbCollections), Force: force, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + + fmt.Println(resp.GetMsg()) + duration := time.Now().Unix() - start + fmt.Println(fmt.Sprintf("duration:%d s", duration)) }, } diff --git a/cmd/delete.go b/cmd/delete.go index e38701ca..a0736876 100644 --- a/cmd/delete.go +++ b/cmd/delete.go @@ -30,7 +30,7 @@ var deleteBackupCmd = &cobra.Command{ BackupName: deleteBackName, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + fmt.Println(resp.GetMsg()) }, } diff --git a/cmd/restore.go b/cmd/restore.go index ccc061e2..ae0cadcb 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" jsoniter "github.com/json-iterator/go" "github.com/spf13/cobra" @@ -39,6 +40,7 @@ var restoreBackupCmd = &cobra.Command{ context := context.Background() backupContext := core.CreateBackupContext(context, params) log.Info("restore cmd input args", zap.Strings("args", args)) + start := time.Now().Unix() var collectionNameArr []string if restoreCollectionNames == "" { collectionNameArr = []string{} @@ -84,7 +86,9 @@ var restoreBackupCmd = &cobra.Command{ RestoreIndex: restoreIndex, }) - fmt.Println(resp.GetCode(), "\n", resp.GetMsg()) + fmt.Println(resp.GetMsg()) + duration := time.Now().Unix() - start + fmt.Println(fmt.Sprintf("duration:%d s", duration)) }, } diff --git a/core/backup_context.go b/core/backup_context.go index c40054a0..83bbed4d 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -13,6 +13,7 @@ import ( "github.com/zilliztech/milvus-backup/core/proto/backuppb" "github.com/zilliztech/milvus-backup/core/storage" "github.com/zilliztech/milvus-backup/core/utils" + "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" ) @@ -50,7 +51,7 @@ type BackupContext struct { restoreTasks map[string]*backuppb.RestoreBackupTask - //backupWorkerPool *common.WorkerPool + bulkinsertWorkerPool *common.WorkerPool } func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) { @@ -141,18 +142,18 @@ func (b *BackupContext) getStorageClient() storage.ChunkManager { return *b.storageClient } -//func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool { -// if b.backupWorkerPool == nil { -// wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS) -// if err != nil { -// log.Error("failed to initial copy data woker pool", zap.Error(err)) -// panic(err) -// } -// b.backupWorkerPool = wp -// b.backupWorkerPool.Start() -// } -// return b.backupWorkerPool -//} +func (b *BackupContext) getBackupWorkerPool() *common.WorkerPool { + if b.bulkinsertWorkerPool == nil { + wp, err := common.NewWorkerPool(b.ctx, b.params.BackupCfg.BackupParallelism, RPS) + if err != nil { + log.Error("failed to initial copy data woker pool", zap.Error(err)) + panic(err) + } + b.bulkinsertWorkerPool = wp + b.bulkinsertWorkerPool.Start() + } + return b.bulkinsertWorkerPool +} func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBackupRequest) *backuppb.BackupInfoResponse { if request.GetRequestId() == "" { diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index ca461c49..e04e17da 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -462,68 +462,119 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup } }() + jobIds := make([]int64, 0) for _, partitionBackup := range task.GetCollBackup().GetPartitionBackups() { - exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) - if err != nil { - log.Error("fail to check has partition", zap.Error(err)) - return task, err - } - if !exist { - err = retry.Do(ctx, func() error { - return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) - }, retry.Attempts(10), retry.Sleep(1*time.Second)) + partitionBackup2 := partitionBackup + job := func(ctx context.Context) error { + log.Info("start restore partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup2.GetPartitionName())) + _, err := b.restorePartition(ctx, targetDBName, targetCollectionName, partitionBackup2, task, isSameBucket, backupBucketName, backupPath, tempDir) if err != nil { - log.Error("fail to create partition", zap.Error(err)) - return task, err + log.Error("fail to restore partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup2.GetPartitionName()), + zap.Error(err)) + return err } + return err } - log.Info("create partition", - zap.String("collectionName", targetCollectionName), - zap.String("partitionName", partitionBackup.GetPartitionName())) - - // bulk insert - copyAndBulkInsert := func(files []string) error { - realFiles := make([]string, len(files)) - // if milvus bucket and backup bucket are not the same, should copy the data first - if !isSameBucket { - log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files)) - for i, file := range files { - // empty delta file, no need to copy - if file == "" { - realFiles[i] = file - } else { - err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file) - if err != nil { - log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err)) - return err - } - realFiles[i] = tempDir + file + jobId := b.getBackupWorkerPool().SubmitWithId(job) + jobIds = append(jobIds, jobId) + } + + err = b.getBackupWorkerPool().WaitJobs(jobIds) + return task, err +} + +func (b *BackupContext) restorePartition(ctx context.Context, targetDBName, targetCollectionName string, + partitionBackup *backuppb.PartitionBackupInfo, task *backuppb.RestoreCollectionTask, isSameBucket bool, backupBucketName string, backupPath string, tempDir string) (*backuppb.RestoreCollectionTask, error) { + exist, err := b.getMilvusClient().HasPartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) + if err != nil { + log.Error("fail to check has partition", zap.Error(err)) + return task, err + } + if !exist { + err = retry.Do(ctx, func() error { + return b.getMilvusClient().CreatePartition(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName()) + }, retry.Attempts(10), retry.Sleep(1*time.Second)) + if err != nil { + log.Error("fail to create partition", zap.Error(err)) + return task, err + } + } + log.Info("create partition", + zap.String("collectionName", targetCollectionName), + zap.String("partitionName", partitionBackup.GetPartitionName())) + + // bulk insert + copyAndBulkInsert := func(files []string) error { + realFiles := make([]string, len(files)) + // if milvus bucket and backup bucket are not the same, should copy the data first + if !isSameBucket { + log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files)) + for i, file := range files { + // empty delta file, no need to copy + if file == "" { + realFiles[i] = file + } else { + err := b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file) + if err != nil { + log.Error("fail to copy backup date from backup bucket to restore target milvus bucket", zap.Error(err)) + return err } + realFiles[i] = tempDir + file } - } else { - realFiles = files } + } else { + realFiles = files + } + + err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) + if err != nil { + log.Error("fail to bulk insert to partition", + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetDBName", targetDBName), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName()), + zap.Error(err)) + return err + } + return nil + } - err = b.executeBulkInsert(ctx, targetDBName, targetCollectionName, partitionBackup.GetPartitionName(), realFiles, int64(task.GetCollBackup().BackupTimestamp)) + if task.GetMetaOnly() { + task.Progress = 100 + } else { + groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups()) + if len(groupIds) == 1 && groupIds[0] == 0 { + // backward compatible old backup without group id + files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) if err != nil { - log.Error("fail to bulk insert to partition", + log.Error("fail to get partition backup binlog files", + zap.Error(err), zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetDBName", targetDBName), zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName()), - zap.Error(err)) - return err + zap.String("partition", partitionBackup.GetPartitionName())) + return task, err + } + err = copyAndBulkInsert(files) + if err != nil { + log.Error("fail to (copy and) bulkinsert data", + zap.Error(err), + zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), + zap.String("targetCollectionName", targetCollectionName), + zap.String("partition", partitionBackup.GetPartitionName())) + return task, err } - return nil - } - - if task.GetMetaOnly() { - task.Progress = 100 } else { - groupIds := collectGroupIdsFromSegments(partitionBackup.GetSegmentBackups()) - if len(groupIds) == 1 && groupIds[0] == 0 { - // backward compatible old backup without group id - files, err := b.getBackupPartitionPaths(ctx, backupBucketName, backupPath, partitionBackup) + // bulk insert by segment groups + for _, groupId := range groupIds { + files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) if err != nil { log.Error("fail to get partition backup binlog files", zap.Error(err), @@ -541,39 +592,16 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup zap.String("partition", partitionBackup.GetPartitionName())) return task, err } - } else { - // bulk insert by segment groups - for _, groupId := range groupIds { - files, err := b.getBackupPartitionPathsWithGroupID(ctx, backupBucketName, backupPath, partitionBackup, groupId) - if err != nil { - log.Error("fail to get partition backup binlog files", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - err = copyAndBulkInsert(files) - if err != nil { - log.Error("fail to (copy and) bulkinsert data", - zap.Error(err), - zap.String("backupCollectionName", task.GetCollBackup().GetCollectionName()), - zap.String("targetCollectionName", targetCollectionName), - zap.String("partition", partitionBackup.GetPartitionName())) - return task, err - } - } - } - task.RestoredSize = task.RestoredSize + partitionBackup.GetSize() - if task.ToRestoreSize == 0 { - task.Progress = 100 - } else { - task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize) } } + task.RestoredSize = task.RestoredSize + partitionBackup.GetSize() + if task.ToRestoreSize == 0 { + task.Progress = 100 + } else { + task.Progress = int32(100 * task.RestoredSize / task.ToRestoreSize) + } } - - return task, err + return task, nil } func collectGroupIdsFromSegments(segments []*backuppb.SegmentBackupInfo) []int64 { diff --git a/internal/common/workerpool.go b/internal/common/workerpool.go index 73286a1b..fd3319cb 100644 --- a/internal/common/workerpool.go +++ b/internal/common/workerpool.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "go.uber.org/atomic" + "sync" "time" "golang.org/x/sync/errgroup" @@ -12,16 +14,25 @@ import ( // WorkerPool a pool that can control the total amount and rate of concurrency type WorkerPool struct { - job chan Job + job chan JobWithId g *errgroup.Group subCtx context.Context workerNum int lim *rate.Limiter + + nextId atomic.Int64 + jobsStatus sync.Map + jobsError sync.Map } type Job func(ctx context.Context) error +type JobWithId struct { + job Job + id int64 +} + // NewWorkerPool build a worker pool, rps 0 is unlimited func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, error) { if workerNum <= 0 { @@ -36,13 +47,19 @@ func NewWorkerPool(ctx context.Context, workerNum int, rps int32) (*WorkerPool, lim = rate.NewLimiter(rate.Every(time.Second/time.Duration(rps)), 1) } - return &WorkerPool{job: make(chan Job), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil + return &WorkerPool{job: make(chan JobWithId), workerNum: workerNum, g: g, lim: lim, subCtx: subCtx}, nil +} + +func (p *WorkerPool) Start() { + //p.jobsStatus = make(map[*Job]string) + //p.jobsError = make(map[*Job]error) + p.g.Go(p.work) + p.nextId = atomic.Int64{} } -func (p *WorkerPool) Start() { p.g.Go(p.work) } func (p *WorkerPool) work() error { for j := range p.job { - job := j + jobWithId := j p.g.Go(func() error { if p.lim != nil { if err := p.lim.Wait(p.subCtx); err != nil { @@ -50,16 +67,54 @@ func (p *WorkerPool) work() error { } } - if err := job(p.subCtx); err != nil { + if err := jobWithId.job(p.subCtx); err != nil { + p.jobsError.Store(jobWithId.id, err) + p.jobsStatus.Store(jobWithId.id, "done") return fmt.Errorf("workerpool: execute job %w", err) } - + p.jobsStatus.Store(jobWithId.id, "done") return nil }) } return nil } -func (p *WorkerPool) Submit(job Job) { p.job <- job } -func (p *WorkerPool) Done() { close(p.job) } -func (p *WorkerPool) Wait() error { return p.g.Wait() } +func (p *WorkerPool) Submit(job Job) { + jobId := p.nextId.Inc() + p.job <- JobWithId{job: job, id: jobId} + //p.jobsStatus.Store(jobId, "started") +} +func (p *WorkerPool) Done() { close(p.job) } +func (p *WorkerPool) Wait() error { return p.g.Wait() } + +func (p *WorkerPool) SubmitWithId(job Job) int64 { + jobId := p.nextId.Inc() + p.job <- JobWithId{job: job, id: jobId} + return jobId +} + +func (p *WorkerPool) WaitJobs(jobIds []int64) error { + for { + var done = true + var err error = nil + for _, jobId := range jobIds { + if value, ok := p.jobsStatus.Load(jobId); ok && value == "done" { + done = done + } else { + done = false + break + } + + if jobError, exist := p.jobsError.Load(jobId); exist { + err = jobError.(error) + break + } + } + if err != nil { + return err + } + if done { + return nil + } + } +} diff --git a/internal/common/workerpool_test.go b/internal/common/workerpool_test.go index 99bb5537..1c2112da 100644 --- a/internal/common/workerpool_test.go +++ b/internal/common/workerpool_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/atomic" @@ -48,3 +49,30 @@ func TestRunTaskReturnErr(t *testing.T) { wp.Done() assert.NotNil(t, wp.Wait()) } + +func TestWaitJobs(t *testing.T) { + wp, err := NewWorkerPool(context.Background(), 3, 10) + assert.Nil(t, err) + + wp.Start() + start := time.Now().Unix() + jobs := make([]int64, 0) + for i := 0; i < 10; i++ { + job := func(ctx context.Context) error { + //return errors.New("some err") + time.Sleep(2 * time.Second) + //return errors.New("some err") + return nil + } + id := wp.SubmitWithId(job) + jobs = append(jobs, id) + } + + //time.Sleep(15 * time.Second) + err = wp.WaitJobs(jobs) + + assert.NoError(t, err) + duration := time.Now().Unix() - start + assert.True(t, duration >= 8) + //wp.Done() +}