Skip to content

Commit

Permalink
tolerrant for incorrect flush return segments
Browse files Browse the repository at this point in the history
  • Loading branch information
wayblink committed Feb 22, 2023
1 parent 071c280 commit ec4264d
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,14 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}

// Flush
segmentEntitiesBeforeFlush, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo before flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))

newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.milvusClient.Flush(ctx, collection.GetCollectionName(), false)
if err != nil {
log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName()))
Expand All @@ -473,48 +481,54 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
collection.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
collection.BackupPhysicalTimestamp = uint64(timeOfSeal)

flushSegments := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntities, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
if err != nil {
return backupInfo, err
}
log.Info("GetPersistentSegmentInfo from milvus",
log.Info("GetPersistentSegmentInfo after flush from milvus",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(segmentEntities)))

checkSegmentsFunc := func(flushSegmentIds []int64, segmentEntities []*entity.Segment) ([]*entity.Segment, error) {
segmentDict := utils.ArrayToMap(flushSegmentIds)
checkedSegments := make([]*entity.Segment, 0)
for _, seg := range segmentEntities {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
checkedSegments = append(checkedSegments, seg)
} else {
log.Warn("this may be new segments after flush, skip it", zap.Int64("id", sid))
}
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)),
zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush)))

// fill segments
filledSegments := make([]*entity.Segment, 0)
segmentDict := utils.ArrayToMap(flushSegmentIDs)
for _, seg := range segmentEntitiesAfterFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
filledSegments = append(filledSegments, seg)
} else {
log.Warn("this may be new segments after flush, skip it", zap.Int64("id", sid))
}
if len(segmentDict) > 0 {
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
log.Warn(errorMsg)
return checkedSegments, errors.New(errorMsg)
}
for _, seg := range segmentEntitiesBeforeFlush {
sid := seg.ID
if _, ok := segmentDict[sid]; ok {
delete(segmentDict, sid)
filledSegments = append(filledSegments, seg)
} else {
log.Warn("this may be old segments before flush, skip it", zap.Int64("id", sid))
}
return checkedSegments, nil
}
checkedSegments, err := checkSegmentsFunc(flushSegments, segmentEntities)
if len(segmentDict) > 0 {
// very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
log.Warn(errorMsg)
}

if err != nil {
collection.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
collection.ErrorMessage = err.Error()
return backupInfo, err
}
log.Info("Finished segment check",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("before check", len(segmentEntities)),
zap.Int("after check", len(checkedSegments)))
log.Info("Finished fill segment",
zap.String("collectionName", collection.GetCollectionName()))

segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo)
for _, segment := range checkedSegments {
for _, segment := range filledSegments {
segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows)
if err != nil {
return backupInfo, err
Expand All @@ -528,7 +542,7 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
}
log.Info("readSegmentInfo from storage",
zap.String("collectionName", collection.GetCollectionName()),
zap.Int("segmentNum", len(checkedSegments)))
zap.Int("segmentNum", len(filledSegments)))

leveledBackupInfo.segmentLevel = &backuppb.SegmentLevelBackupInfo{
Infos: segmentLevelBackupInfos,
Expand Down

0 comments on commit ec4264d

Please sign in to comment.