Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 24a6bcb

Browse files
committed
implement meta tag record stuff for partitioned index
1 parent 29812c4 commit 24a6bcb

File tree

1 file changed

+31
-3
lines changed

1 file changed

+31
-3
lines changed

idx/memory/partitioned_idx.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -545,10 +545,38 @@ func (p *PartitionedMemoryIdx) idsByTagQuery(orgId uint32, query TagQuery) IdSet
545545
return response
546546
}
547547

548-
func (m *PartitionedMemoryIdx) MetaTagRecordList(orgId uint32) []idx.MetaTagRecord {
548+
func (p *PartitionedMemoryIdx) MetaTagRecordList(orgId uint32) []idx.MetaTagRecord {
549+
for _, m := range p.Partition {
550+
// all partitions should have all meta records
551+
return m.MetaTagRecordList(orgId)
552+
}
549553
return nil
550554
}
551555

552-
func (m *PartitionedMemoryIdx) MetaTagRecordUpsert(orgId uint32, rawRecord idx.MetaTagRecord) (idx.MetaTagRecord, bool, error) {
553-
return idx.MetaTagRecord{}, false, nil
556+
func (p *PartitionedMemoryIdx) MetaTagRecordUpsert(orgId uint32, rawRecord idx.MetaTagRecord) (idx.MetaTagRecord, bool, error) {
557+
g, _ := errgroup.WithContext(context.Background())
558+
559+
var i int
560+
var record idx.MetaTagRecord
561+
var created bool
562+
for _, m := range p.Partition {
563+
g.Go(func() error {
564+
var err error
565+
if i == 0 {
566+
record, created, err = m.MetaTagRecordUpsert(orgId, rawRecord)
567+
} else {
568+
_, _, err = m.MetaTagRecordUpsert(orgId, rawRecord)
569+
}
570+
571+
return err
572+
})
573+
i++
574+
}
575+
576+
if err := g.Wait(); err != nil {
577+
log.Errorf("memory-idx: failed to upsert meta tag record in at least one partition: %s", err)
578+
return record, created, err
579+
}
580+
581+
return record, created, nil
554582
}

0 commit comments

Comments
 (0)