diff --git a/core/reader/collection_reader.go b/core/reader/collection_reader.go index c397330..51ebab8 100644 --- a/core/reader/collection_reader.go +++ b/core/reader/collection_reader.go @@ -192,24 +192,31 @@ func (reader *CollectionReader) StartRead(ctx context.Context) { return } - recordCreateCollectionTime := make(map[string]*pb.CollectionInfo) + recordCreateCollectionTime := make(map[int64]map[string]*pb.CollectionInfo) repeatedCollectionID := make(map[int64]struct{}) - repeatedCollectionName := make(map[string]struct{}) + repeatedCollectionName := make(map[int64][]string) for _, info := range existedCollectionInfos { - // TODO should consider the same collection name in different db collectionName := info.Schema.GetName() createTime := info.CreateTime - lastCollectionInfo, recordOK := recordCreateCollectionTime[collectionName] + dbCollections := recordCreateCollectionTime[info.GetDbId()] + if dbCollections == nil { + dbCollections = map[string]*pb.CollectionInfo{ + collectionName: info, + } + recordCreateCollectionTime[info.GetDbId()] = dbCollections + continue + } + lastCollectionInfo, recordOK := dbCollections[collectionName] if recordOK { if createTime > lastCollectionInfo.CreateTime { repeatedCollectionID[lastCollectionInfo.ID] = struct{}{} - recordCreateCollectionTime[collectionName] = info + dbCollections[collectionName] = info } else { repeatedCollectionID[info.ID] = struct{}{} } - repeatedCollectionName[collectionName] = struct{}{} + repeatedCollectionName[info.GetDbId()] = append(repeatedCollectionName[info.GetDbId()], collectionName) } else { - recordCreateCollectionTime[collectionName] = info + dbCollections[collectionName] = info } } @@ -236,7 +243,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) { seekPositions := make([]*msgpb.MsgPosition, 0) if collectionSeekPositionMap != nil { seekPositions = lo.Values(collectionSeekPositionMap) - } else if _, ok := repeatedCollectionName[info.Schema.Name]; ok { + } else if dbCollections, ok := repeatedCollectionName[info.DbId]; ok && lo.Contains(dbCollections, info.Schema.Name) { log.Warn("server warn: find the repeated collection, the latest collection will use the collection start position.", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID)) for _, v := range info.StartPositions { seekPositions = append(seekPositions, &msgstream.MsgPosition{