Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure native histograms counter reset hints are corrected when merging results from different sources #9909

Merged
merged 36 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9c1cf59
Add krajo's test (currently failing)
fionaliao Nov 13, 2024
ed75cb5
Add test showing single ingester return
fionaliao Nov 13, 2024
dfeabaa
add failing test
fionaliao Nov 13, 2024
a807173
typo
fionaliao Nov 13, 2024
d48c564
Fix but overdetecting unknowns
fionaliao Nov 14, 2024
981617f
Link batches from same iterator
fionaliao Nov 14, 2024
7fb9087
clarify comment
fionaliao Nov 14, 2024
5b0026a
Update pkg/querier/batch/stream.go
fionaliao Nov 15, 2024
a6d624f
Update pkg/querier/batch/stream.go
fionaliao Nov 15, 2024
b5c1eca
Fix stream test
fionaliao Nov 15, 2024
fa510a0
Fix nh counter resets when merging overlapping chunks
krajorama Nov 15, 2024
d3fcd03
Set UCR when NCR for some tests
fionaliao Nov 25, 2024
d8fb5c4
Pass id as constructor arg
fionaliao Nov 25, 2024
b0246e7
Uncomment
fionaliao Nov 25, 2024
0c8ee53
Remove unused function
fionaliao Nov 25, 2024
ef1cafb
Add tests for checking hints
fionaliao Nov 25, 2024
7e2148e
More complex reset tests
fionaliao Nov 25, 2024
9cc1b84
Remove unused test func
fionaliao Nov 25, 2024
62e7235
Remove TODO
fionaliao Nov 25, 2024
732ec71
Add interleaving test
fionaliao Nov 26, 2024
aff826f
Clean up ingester test
fionaliao Nov 26, 2024
89cd0ed
Consistent ID
fionaliao Nov 26, 2024
3594ee6
Update ID again
fionaliao Nov 26, 2024
15643c5
Refactor ingester querying tests and add streaming case
fionaliao Nov 26, 2024
53594d6
Add store-gw tests
fionaliao Nov 26, 2024
e9ccf39
Add float test
fionaliao Nov 26, 2024
00b0e96
Explain reuse of values
fionaliao Nov 26, 2024
d780a01
Add comment
fionaliao Nov 27, 2024
0f4f294
Fix comment typo
fionaliao Nov 27, 2024
deb2847
Fix more comments
fionaliao Nov 27, 2024
9932a4f
Update CHANGELOG
fionaliao Nov 27, 2024
6844b1e
Lint
fionaliao Nov 27, 2024
24bea3c
More lint
fionaliao Nov 27, 2024
3a2310c
Fix bad rename
fionaliao Nov 27, 2024
99e723d
Update CHANGELOG.md
fionaliao Nov 28, 2024
183f987
Remove old comment
fionaliao Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666
* [BUGFIX] Fix issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where the ruler could panic as it updates a running ruleset or shutdowns. #9726
* [BUGFIX] Always return unknown hint for first sample in non-gauge native histograms chunk to avoid incorrect counter reset hints when merging chunks from different sources. #10033
* [BUGFIX] Ensure counter reset hints are corrected when merging batches. #9909
fionaliao marked this conversation as resolved.
Show resolved Hide resolved
* [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708
* [BUGFIX] Ingester: Fix race condition in exemplar adding. #9765
* [BUGFIX] Ingester: Fix race condition in native histogram appending. #9765
Expand Down
177 changes: 177 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -5749,6 +5750,182 @@ func TestIngester_QueryExemplars(t *testing.T) {
})
}

// This test shows a single ingester returns compacted OOO and in-order chunks separately after compaction, even if they overlap.
func TestIngester_QueryStream_CounterResets(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.HeadCompactionInterval = 1 * time.Hour // Long enough to not be reached during the test.
cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout = 1 * time.Second
cfg.BlocksStorageConfig.TSDB.HeadCompactionIntervalJitterEnabled = false
cfg.TSDBConfigUpdatePeriod = 1 * time.Second

// Set the OOO window to 30 minutes and enable native histograms.
limits := map[string]*validation.Limits{
userID: {
OutOfOrderTimeWindow: model.Duration(30 * time.Minute),
OOONativeHistogramsIngestionEnabled: true,
NativeHistogramsIngestionEnabled: true,
},
}
override, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

i, err := prepareIngesterWithBlockStorageAndOverrides(t, cfg, override, nil, "", "", nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's healthy.
test.Poll(t, 1*time.Second, 1, func() interface{} {
return i.lifecycler.HealthyInstancesCount()
})

// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

histLbls := labels.FromStrings(labels.MetricName, "foo", "series_id", strconv.Itoa(0), "type", "histogram")
histReq := mockHistogramWriteRequest(histLbls, int64(0), 4, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(2), 6, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(4), 8, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(1), 2, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

histReq = mockHistogramWriteRequest(histLbls, int64(3), 3, false)
_, err = i.Push(ctx, histReq)
require.NoError(t, err)

// Create a GRPC server used to query back the data.
serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
defer serv.GracefulStop()
client.RegisterIngesterServer(serv, i)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

inst := ring.InstanceDesc{Id: "test", Addr: listener.Addr().String()}
c, err := client.MakeIngesterClient(inst, defaultClientTestConfig(), client.NewMetrics(nil))
require.NoError(t, err)
defer c.Close()

runQuery := func() ([]chunkenc.CounterResetHeader, [][]sample) {
s, err := c.QueryStream(ctx, &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: 5,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
})
require.NoError(t, err)

recvMsgs := 0

chunks := []client.Chunk{}
for {
resp, err := s.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)

for _, c := range resp.Chunkseries {
chunks = append(chunks, c.Chunks...)
}
recvMsgs++
}

require.Equal(t, recvMsgs, 1)
// Sort chunks by time
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].StartTimestampMs < chunks[j].StartTimestampMs
})

headers := []chunkenc.CounterResetHeader{}
var samples [][]sample
for _, c := range chunks {
require.Equal(t, c.Encoding, int32(chunk.PrometheusHistogramChunk))
chk, err := chunkenc.FromData(chunkenc.EncHistogram, c.Data)
require.NoError(t, err)

s := []sample{}
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
ts, h := it.AtHistogram(nil)
s = append(s, sample{t: ts, h: h})
}
samples = append(samples, s)
headers = append(headers, chk.(*chunkenc.HistogramChunk).GetCounterResetHeader())
}
return headers, samples
}

// Check samples before compaction (OOO and in-order samples are merged when both are in the head).
actHeaders, actSamples := runQuery()
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.CounterReset, chunkenc.CounterReset}, actHeaders)
require.Equal(t, [][]sample{
{
{t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)},
},
{
{t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)},
{t: 2, h: histogramWithHint(6, histogram.NotCounterReset)},
},
{
{t: 3, h: histogramWithHint(3, histogram.UnknownCounterReset)},
{t: 4, h: histogramWithHint(8, histogram.NotCounterReset)},
},
}, actSamples)

time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter)))

// Compaction
i.compactBlocks(context.Background(), false, 0, nil) // Should be compacted because the TSDB is idle.
verifyCompactedHead(t, i, true)

defer c.Close()

actHeaders, actSamples = runQuery()
require.Equal(t, []chunkenc.CounterResetHeader{chunkenc.UnknownCounterReset, chunkenc.UnknownCounterReset}, actHeaders)
require.Equal(t, [][]sample{
{
{t: 0, h: histogramWithHint(4, histogram.UnknownCounterReset)},
{t: 2, h: histogramWithHint(6, histogram.NotCounterReset)},
{t: 4, h: histogramWithHint(8, histogram.NotCounterReset)},
},
{
{t: 1, h: histogramWithHint(2, histogram.UnknownCounterReset)},
{t: 3, h: histogramWithHint(3, histogram.NotCounterReset)},
},
}, actSamples)
}

func histogramWithHint(idx int, hint histogram.CounterResetHint) *histogram.Histogram {
h := util_test.GenerateTestHistogram(idx)
h.CounterResetHint = hint
return h
}

type sample struct {
t int64
h *histogram.Histogram
}

func writeRequestSingleSeries(lbls labels.Labels, samples []mimirpb.Sample) *mimirpb.WriteRequest {
req := &mimirpb.WriteRequest{
Source: mimirpb.API,
Expand Down
57 changes: 47 additions & 10 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package batch

import (
"slices"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -82,7 +83,15 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, encoding ch
return NewGenericChunk(int64(ck.From), int64(ck.Through), ck.Data.NewIterator)
}

func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) {
type testBatchOptions uint

const (
// setNotCounterResetHintsAsUnknown can be used in cases where it's onerous to generate all the expected counter
// reset hints (e.g. merging lots of chunks together).
setNotCounterResetHintsAsUnknown testBatchOptions = iota
)

func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) {
nextExpectedTS := model.TimeFromUnix(0)
var assertPoint func(i int)
switch encoding {
Expand All @@ -100,8 +109,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, h := iter.AtHistogram(nil)
require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i))
expH := test.GenerateTestHistogram(int(nextExpectedTS))
if nextExpectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if h.CounterResetHint == histogram.NotCounterReset {
h.CounterResetHint = histogram.UnknownCounterReset
expH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if nextExpectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireHistogramEqual(t, expH, h, strconv.Itoa(i))
nextExpectedTS = nextExpectedTS.Add(step)
Expand All @@ -112,8 +128,15 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, fh := iter.AtFloatHistogram(nil)
require.EqualValues(t, int64(nextExpectedTS), ts, strconv.Itoa(i))
expFH := test.GenerateTestFloatHistogram(int(nextExpectedTS))
if nextExpectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if fh.CounterResetHint == histogram.NotCounterReset {
fh.CounterResetHint = histogram.UnknownCounterReset
expFH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if nextExpectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireFloatHistogramEqual(t, expFH, fh, strconv.Itoa(i))
nextExpectedTS = nextExpectedTS.Add(step)
Expand All @@ -127,7 +150,7 @@ func testIter(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
require.Equal(t, chunkenc.ValNone, iter.Next())
}

func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding) {
func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding chunk.Encoding, opts ...testBatchOptions) {
var assertPoint func(expectedTS int64, valType chunkenc.ValueType)
switch encoding {
case chunk.PrometheusXorChunk:
Expand All @@ -144,8 +167,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, h := iter.AtHistogram(nil)
require.EqualValues(t, expectedTS, ts)
expH := test.GenerateTestHistogram(int(expectedTS))
if expectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if h.CounterResetHint == histogram.NotCounterReset {
h.CounterResetHint = histogram.UnknownCounterReset
expH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if expectedTS > 0 {
expH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireHistogramEqual(t, expH, h)
require.NoError(t, iter.Err())
Expand All @@ -156,8 +186,15 @@ func testSeek(t require.TestingT, points int, iter chunkenc.Iterator, encoding c
ts, fh := iter.AtFloatHistogram(nil)
require.EqualValues(t, expectedTS, ts)
expFH := test.GenerateTestFloatHistogram(int(expectedTS))
if expectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
if slices.Contains(opts, setNotCounterResetHintsAsUnknown) {
if fh.CounterResetHint == histogram.NotCounterReset {
fh.CounterResetHint = histogram.UnknownCounterReset
expFH.CounterResetHint = histogram.UnknownCounterReset
}
} else {
if expectedTS > 0 {
expFH.CounterResetHint = histogram.NotCounterReset
}
}
test.RequireFloatHistogramEqual(t, expFH, fh)
require.NoError(t, iter.Err())
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/batch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newMergeIterator(it iterator, cs []GenericChunk) *mergeIterator {
c.batches = newBatchStream(len(c.its), &c.hPool, &c.fhPool)
}
for i, cs := range css {
c.its[i] = newNonOverlappingIterator(c.its[i], cs, &c.hPool, &c.fhPool)
c.its[i] = newNonOverlappingIterator(c.its[i], i, cs, &c.hPool, &c.fhPool)
}

for _, iter := range c.its {
Expand Down Expand Up @@ -138,7 +138,7 @@ func (c *mergeIterator) buildNextBatch(size int) chunkenc.ValueType {
// is before all iterators next entry.
for len(c.h) > 0 && (c.batches.len() == 0 || c.nextBatchEndTime() >= c.h[0].AtTime()) {
batch := c.h[0].Batch()
c.batches.merge(&batch, size)
c.batches.merge(&batch, size, c.h[0].id)

if c.h[0].Next(size) != chunkenc.ValNone {
heap.Fix(&c.h, 0)
Expand All @@ -165,7 +165,7 @@ func (c *mergeIterator) Err() error {
return c.currErr
}

type iteratorHeap []iterator
type iteratorHeap []*nonOverlappingIterator

func (h *iteratorHeap) Len() int { return len(*h) }
func (h *iteratorHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
Expand All @@ -177,7 +177,7 @@ func (h *iteratorHeap) Less(i, j int) bool {
}

func (h *iteratorHeap) Push(x interface{}) {
*h = append(*h, x.(iterator))
*h = append(*h, x.(*nonOverlappingIterator))
}

func (h *iteratorHeap) Pop() interface{} {
Expand Down
Loading
Loading