Skip to content

Commit

Permalink
chore: Rename HeapIterator to MergeEntryIterator (grafana#13975)
Browse files Browse the repository at this point in the history
Stumbled across this iterator interface and was confused about its name. It leaked its implementation details, and additionally that was not even correct any more, since the implementation has changed.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored and pascal-sochacki committed Aug 29, 2024
1 parent d669d54 commit 23852ab
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
13 changes: 7 additions & 6 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,17 @@ func (i *streamIterator) Close() error {
return nil
}

// HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len
// Not safe for concurrent use
type HeapIterator interface {
// MergeEntryIterator exposes additional fields that are used by the Tailer only.
// Not safe for concurrent use!
type MergeEntryIterator interface {
EntryIterator

Peek() time.Time
IsEmpty() bool
Push(EntryIterator)
}

// mergeEntryIterator iterates over a heap of iterators and merge duplicate entries.
// mergeEntryIterator implements the MergeEntryIterator interface functions.
type mergeEntryIterator struct {
tree *loser.Tree[sortFields, EntryIterator]
stats *stats.Context
Expand All @@ -74,11 +75,11 @@ type mergeEntryIterator struct {
errs []error
}

// NewMergeEntryIterator returns a new iterator which uses a heap to merge together entries for multiple iterators and deduplicate entries if any.
// NewMergeEntryIterator returns a new iterator which uses a looser tree to merge together entries for multiple iterators and deduplicate entries if any.
// The iterator only order and merge entries across given `is` iterators, it does not merge entries within individual iterator.
// This means using this iterator with a single iterator will result in the same result as the input iterator.
// If you don't need to deduplicate entries, use `NewSortEntryIterator` instead.
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) MergeEntryIterator {
maxVal, less := treeLess(direction)
result := &mergeEntryIterator{stats: stats.FromContext(ctx)}
result.tree = loser.New(is, maxVal, sortFieldsAt, less, result.closeEntry)
Expand Down
8 changes: 4 additions & 4 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,16 @@ func TestIteratorMultipleLabels(t *testing.T) {
func TestMergeIteratorPrefetch(t *testing.T) {
t.Parallel()

type tester func(t *testing.T, i HeapIterator)
type tester func(t *testing.T, i MergeEntryIterator)

tests := map[string]tester{
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on IsEmpty() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, false, i.IsEmpty())
},
"prefetch on Peek() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Peek() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.Equal(t, time.Unix(0, 0), i.Peek())
},
"prefetch on Next() when called as first method": func(t *testing.T, i HeapIterator) {
"prefetch on Next() when called as first method": func(t *testing.T, i MergeEntryIterator) {
assert.True(t, i.Next())
assert.Equal(t, logproto.Entry{Timestamp: time.Unix(0, 0), Line: "0"}, i.At())
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// Tailer manages complete lifecycle of a tail request
type Tailer struct {
// openStreamIterator is for streams already open
openStreamIterator iter.HeapIterator
openStreamIterator iter.MergeEntryIterator
streamMtx sync.Mutex // for synchronizing access to openStreamIterator

currEntry logproto.Entry
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
for _, chunks := range chks {
if len(chunks) != 0 && len(chunks[0]) != 0 {
streamPipeline := it.pipeline.ForStream(labels.NewBuilder(chunks[0][0].Chunk.Metric).Del(labels.MetricName).Labels())
iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk)
iterator, err := it.buildMergeIterator(chunks, from, through, streamPipeline, nextChunk)
if err != nil {
return nil, err
}
Expand All @@ -433,7 +433,7 @@ func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyC
return result, nil
}

func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
func (it *logBatchIterator) buildMergeIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))

for i := range chks {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1649,9 +1649,9 @@ func TestBuildHeapIterator(t *testing.T) {
ctx: ctx,
pipeline: log.NewNoopPipeline(),
}
it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
it, err := b.buildMergeIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil)
if err != nil {
t.Errorf("buildHeapIterator error = %v", err)
t.Errorf("buildMergeIterator error = %v", err)
return
}
req := newQuery("{foo=\"bar\"}", from, from.Add(6*time.Millisecond), nil, nil)
Expand Down

0 comments on commit 23852ab

Please sign in to comment.