Skip to content

Commit

Permalink
Define meta v2 (zilliztech#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink authored May 23, 2024
1 parent 6f25560 commit 93f9ae0
Show file tree
Hide file tree
Showing 8 changed files with 888 additions and 512 deletions.
53 changes: 14 additions & 39 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const (
BULKINSERT_SLEEP_INTERVAL = 5
BACKUP_NAME = "BACKUP_NAME"
COLLECTION_RENAME_SUFFIX = "COLLECTION_RENAME_SUFFIX"
WORKER_NUM = 100
RPS = 1000
)

Expand All @@ -48,18 +47,11 @@ type BackupContext struct {
milvusRootPath string
backupRootPath string

backupNameIdDict sync.Map //map[string]string
backupTasks sync.Map //map[string]*backuppb.BackupInfo

backupTasksCache sync.Map //map[string]*LeveledBackupInfo
updateMu sync.Mutex

restoreTasks map[string]*backuppb.RestoreBackupTask
meta *MetaManager

backupCollectionWorkerPool *common.WorkerPool
backupCopyDataWorkerPool *common.WorkerPool
//bulkinsertWorkerPool *common.WorkerPool
bulkinsertWorkerPools map[string]*common.WorkerPool
bulkinsertWorkerPools map[string]*common.WorkerPool
}

func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (gomilvus.Client, error) {
Expand Down Expand Up @@ -97,12 +89,7 @@ func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (s
}

func (b *BackupContext) Start() error {
//b.backupTasks = sync.Map{}
b.backupTasksCache = sync.Map{}
b.backupNameIdDict = sync.Map{}
b.restoreTasks = make(map[string]*backuppb.RestoreBackupTask)
b.started = true
b.updateMu = sync.Mutex{}
log.Info(fmt.Sprintf("%+v", b.params.BackupCfg))
log.Info(fmt.Sprintf("%+v", b.params.HTTPCfg))
return nil
Expand All @@ -126,6 +113,7 @@ func CreateBackupContext(ctx context.Context, params paramtable.BackupParams) *B
milvusRootPath: params.MinioCfg.RootPath,
backupRootPath: params.MinioCfg.BackupRootPath,
bulkinsertWorkerPools: make(map[string]*common.WorkerPool),
meta: newMetaManager(),
}
}

Expand Down Expand Up @@ -229,31 +217,17 @@ func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBack
resp.Code = backuppb.ResponseCode_Parameter_Error
resp.Msg = "empty backup name and backup id, please set a backup name or id"
} else if request.GetBackupId() != "" {
if value, ok := b.backupTasksCache.Load(request.GetBackupId()); ok {
backupInfo, err := levelToTree(value.(*LeveledBackupInfo))
if err != nil {
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = err.Error()
} else {
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = backupInfo
}
}
backupInfo := b.meta.GetFullMeta(request.GetBackupId())
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = backupInfo
} else if request.GetBackupName() != "" {
if id, ok := b.backupNameIdDict.Load(request.GetBackupName()); ok {
backupInfo := b.meta.GetBackupByName(request.GetBackupName())
if backupInfo != nil {
fullBackupInfo := b.meta.GetFullMeta(backupInfo.Id)
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
backup, ok := b.backupTasksCache.Load(id)
if ok {
backupInfo, err := levelToTree(backup.(*LeveledBackupInfo))
if err != nil {
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = err.Error()
} else {
resp.Data = backupInfo
}
}
resp.Data = fullBackupInfo
} else {
var backupBucketName string
var backupPath string
Expand Down Expand Up @@ -529,10 +503,11 @@ func (b *BackupContext) GetRestore(ctx context.Context, request *backuppb.GetRes
return resp
}

if value, ok := b.restoreTasks[request.GetId()]; ok {
task := b.meta.GetRestoreTask(request.GetId())
if task != nil {
resp.Code = backuppb.ResponseCode_Success
resp.Msg = "success"
resp.Data = UpdateRestoreBackupTask(value)
resp.Data = UpdateRestoreBackupTask(task)
return resp
} else {
resp.Code = backuppb.ResponseCode_Fail
Expand Down
Loading

0 comments on commit 93f9ae0

Please sign in to comment.