Skip to content

Commit

Permalink
use segment info after flush (#482)
Browse files Browse the repository at this point in the history
Signed-off-by: huanghaoyuanhhy <haoyuan.huang@zilliz.com>
  • Loading branch information
huanghaoyuanhhy authored Dec 17, 2024
1 parent e22a744 commit f497caf
Showing 1 changed file with 1 addition and 29 deletions.
30 changes: 1 addition & 29 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,6 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
zap.Uint64("BackupTimestamp", collectionBackup.BackupTimestamp),
zap.Any("channelCPs", channelCPs))

flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
segmentEntitiesAfterFlush, err := b.getMilvusClient().GetPersistentSegmentInfo(ctx, collectionBackup.GetDbName(), collectionBackup.GetCollectionName())
if err != nil {
return err
Expand All @@ -434,31 +433,8 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
zap.Int64s("segmentIDsAfterFlush", segmentIDsAfterFlush),
zap.Int64s("newL0SegmentsIDs", newL0SegmentsIDs))

segmentEntities := segmentEntitiesBeforeFlush
for _, seg := range segmentEntitiesAfterFlush {
if !lo.Contains(segmentIDsBeforeFlush, seg.ID) {
segmentEntities = append(segmentEntities, seg)
}
}

// segmentIDs
unfilledSegmentIDs := make([]int64, 0)
// union of (intersection of BeforeFlushIDs and AfterFlushIDs) and flushSegmentIDs
for _, seg := range segmentEntitiesAfterFlush {
if lo.Contains(segmentIDsBeforeFlush, seg.ID) {
unfilledSegmentIDs = append(unfilledSegmentIDs, seg.ID)
}
}
for _, segID := range flushSegmentIDs {
if !lo.Contains(unfilledSegmentIDs, segID) {
unfilledSegmentIDs = append(unfilledSegmentIDs, segID)
}
}
unfilledSegmentIDs = append(unfilledSegmentIDs, newL0SegmentsIDs...)
for _, seg := range segmentEntities {
if lo.Contains(unfilledSegmentIDs, seg.ID) {
unfilledSegments = append(unfilledSegments, seg)
}
unfilledSegments = append(unfilledSegments, seg)
}
} else {
// Flush
Expand All @@ -476,10 +452,6 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
}
}

if err != nil {
return err
}

newSegIDs := lo.Map(unfilledSegments, func(segment *entity.Segment, _ int) int64 { return segment.ID })
log.Debug("Finished fill segment",
zap.String("databaseName", collectionBackup.GetDbName()),
Expand Down

0 comments on commit f497caf

Please sign in to comment.