Skip to content

Commit

Permalink
Merge pull request #692 from grafana/pick-pr14874
Browse files Browse the repository at this point in the history
Pick pr14874
  • Loading branch information
krajorama authored Sep 9, 2024
2 parents 53cf3fb + 36acab5 commit 78e8657
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 12 deletions.
64 changes: 52 additions & 12 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3564,6 +3564,56 @@ func TestWaitForPendingReadersInTimeRange(t *testing.T) {
}

func TestQueryOOOHeadDuringTruncate(t *testing.T) {
testQueryOOOHeadDuringTruncate(t,
func(db *DB, minT, maxT int64) (storage.LabelQuerier, error) {
return db.Querier(minT, maxT)
},
func(t *testing.T, lq storage.LabelQuerier, minT, _ int64) {
// Samples
q, ok := lq.(storage.Querier)
require.True(t, ok)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next()) // One series.
it := s.Iterator(nil)
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, minT, it.AtT()) // It is an in-order sample.
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, minT+50, it.AtT()) // it is an out-of-order sample.
require.NoError(t, it.Err())
},
)
}

func TestChunkQueryOOOHeadDuringTruncate(t *testing.T) {
testQueryOOOHeadDuringTruncate(t,
func(db *DB, minT, maxT int64) (storage.LabelQuerier, error) {
return db.ChunkQuerier(minT, maxT)
},
func(t *testing.T, lq storage.LabelQuerier, minT, _ int64) {
// Chunks
q, ok := lq.(storage.ChunkQuerier)
require.True(t, ok)
ss := q.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next()) // One series.
metaIt := s.Iterator(nil)
require.True(t, metaIt.Next())
meta := metaIt.At()
// Samples
it := meta.Chunk.Iterator(nil)
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, minT, it.AtT()) // It is an in-order sample.
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, minT+50, it.AtT()) // it is an out-of-order sample.
require.NoError(t, it.Err())
},
)
}

func testQueryOOOHeadDuringTruncate(t *testing.T, makeQuerier func(db *DB, minT, maxT int64) (storage.LabelQuerier, error), verify func(t *testing.T, q storage.LabelQuerier, minT, maxT int64)) {
const maxT int64 = 6000

dir := t.TempDir()
Expand Down Expand Up @@ -3616,7 +3666,7 @@ func TestQueryOOOHeadDuringTruncate(t *testing.T) {
// Wait for the compaction to start.
<-allowQueryToStart

q, err := db.Querier(1500, 2500)
q, err := makeQuerier(db, 1500, 2500)
require.NoError(t, err)
queryStarted <- struct{}{} // Unblock the compaction.
ctx := context.Background()
Expand All @@ -3633,17 +3683,7 @@ func TestQueryOOOHeadDuringTruncate(t *testing.T) {
require.Empty(t, annots)
require.Equal(t, []string{"b"}, res)

// Samples
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.True(t, ss.Next())
s := ss.At()
require.False(t, ss.Next()) // One series.
it := s.Iterator(nil)
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, int64(1500), it.AtT()) // It is an in-order sample.
require.NotEqual(t, chunkenc.ValNone, it.Next()) // Has some data.
require.Equal(t, int64(1550), it.AtT()) // it is an out-of-order sample.
require.NoError(t, it.Err())
verify(t, q, 1500, 2500)

require.NoError(t, q.Close()) // Cannot be deferred as the compaction waits for queries to close before finishing.

Expand Down
9 changes: 9 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,24 @@ func NewHeadAndOOOChunkQuerier(mint, maxt int64, head *Head, oooIsoState *oooIso
}

func (q *HeadAndOOOChunkQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
if q.querier == nil {
return nil, nil, nil
}
return q.querier.LabelValues(ctx, name, hints, matchers...)
}

func (q *HeadAndOOOChunkQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
if q.querier == nil {
return nil, nil, nil
}
return q.querier.LabelNames(ctx, hints, matchers...)
}

func (q *HeadAndOOOChunkQuerier) Close() error {
q.chunkr.Close()
if q.querier == nil {
return nil
}
return q.querier.Close()
}

Expand Down

0 comments on commit 78e8657

Please sign in to comment.