Skip to content

Commit

Permalink
Do not reuse any chunk iterators
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Feb 13, 2023
1 parent 3c704dc commit e8bd049
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 29 deletions.
5 changes: 2 additions & 3 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ func (q *Queue) Push(alerts []*notifier.Alert) {
lb.Set(l.Name, l.Value)
}

var keep bool
a.Labels, keep = relabel.Process(lb.Labels(nil), q.alertRelabelConfigs...)
if keep {
if lset, keep := relabel.Process(lb.Labels(nil), q.alertRelabelConfigs...); keep {
a.Labels = lset
relabeledAlerts = append(relabeledAlerts, a)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dedup/chunk_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewChunkSeriesMerger() storage.VerticalChunkSeriesMergeFunc {
ChunkIteratorFn: func(iterator chunks.Iterator) chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(series))
for _, s := range series {
iterators = append(iterators, s.Iterator(iterator))
iterators = append(iterators, s.Iterator(nil))
}
return &dedupChunksIterator{
iterators: iterators,
Expand Down
50 changes: 25 additions & 25 deletions pkg/dedup/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,21 @@ func (s *dedupSeries) Labels() labels.Labels {

// pushdownIterator creates an iterator that handles
// all pushed down series.
func (s *dedupSeries) pushdownIterator(iterator chunkenc.Iterator) chunkenc.Iterator {
func (s *dedupSeries) pushdownIterator(chunkenc.Iterator) chunkenc.Iterator {
var pushedDownIterator adjustableSeriesIterator
if s.isCounter {
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
} else {
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
}

for _, o := range s.pushedDown[1:] {
var replicaIterator adjustableSeriesIterator

if s.isCounter {
replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)}
} else {
replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)}
}

pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)}
Expand All @@ -210,39 +210,39 @@ func (s *dedupSeries) pushdownIterator(iterator chunkenc.Iterator) chunkenc.Iter

// allSeriesIterator creates an iterator over all series - pushed down
// and regular replicas.
func (s *dedupSeries) allSeriesIterator(iterator chunkenc.Iterator) chunkenc.Iterator {
func (s *dedupSeries) allSeriesIterator(chunkenc.Iterator) chunkenc.Iterator {
var replicasIterator, pushedDownIterator adjustableSeriesIterator
if len(s.replicas) != 0 {
if s.isCounter {
replicasIterator = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(iterator)}
replicasIterator = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)}
} else {
replicasIterator = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(iterator)}
replicasIterator = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(nil)}
}

for _, o := range s.replicas[1:] {
var replicaIter adjustableSeriesIterator
if s.isCounter {
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)}
} else {
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)}
}
replicasIterator = newDedupSeriesIterator(replicasIterator, replicaIter)
}
}

if len(s.pushedDown) != 0 {
if s.isCounter {
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
} else {
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
}

for _, o := range s.pushedDown[1:] {
var replicaIter adjustableSeriesIterator
if s.isCounter {
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)}
} else {
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)}
}
pushedDownIterator = newDedupSeriesIterator(pushedDownIterator, replicaIter)
}
Expand All @@ -257,34 +257,34 @@ func (s *dedupSeries) allSeriesIterator(iterator chunkenc.Iterator) chunkenc.Ite
return newDedupSeriesIterator(pushedDownIterator, replicasIterator)
}

func (s *dedupSeries) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
func (s *dedupSeries) Iterator(chunkenc.Iterator) chunkenc.Iterator {
// This function needs a regular iterator over all series. Behavior is identical
// whether it was pushed down or not.
if s.f == "group" {
return s.allSeriesIterator(iterator)
return s.allSeriesIterator(nil)
}
// If there are no replicas then jump straight to constructing an iterator
// for pushed down series.
if len(s.replicas) == 0 {
return s.pushdownIterator(iterator)
return s.pushdownIterator(nil)
}

// Finally, if we have both then construct a tree out of them.
// Pushed down series have their own special iterator.
// We deduplicate everything in the end.
var it adjustableSeriesIterator
if s.isCounter {
it = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(iterator)}
it = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)}
} else {
it = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(iterator)}
it = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(nil)}
}

for _, o := range s.replicas[1:] {
var replicaIter adjustableSeriesIterator
if s.isCounter {
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)}
} else {
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)}
}
it = newDedupSeriesIterator(it, replicaIter)
}
Expand All @@ -296,18 +296,18 @@ func (s *dedupSeries) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
// Join all of the pushed down iterators into one.
var pushedDownIterator adjustableSeriesIterator
if s.isCounter {
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
} else {
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(iterator)}
pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)}
}

for _, o := range s.pushedDown[1:] {
var replicaIterator adjustableSeriesIterator

if s.isCounter {
replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)}
} else {
replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(iterator)}
replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)}
}

pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)}
Expand Down

0 comments on commit e8bd049

Please sign in to comment.