Skip to content

Commit

Permalink
store: Refetch series if longer than expected.
Browse files Browse the repository at this point in the history
Fixes: #1983

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 10, 2020
1 parent 3cf366a commit 43e3f12
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 18 deletions.
11 changes: 7 additions & 4 deletions pkg/objstore/inmem/inmem.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package inmem

import (
"bytes"
"context"
"io"
"sort"
"sync"

"bytes"
"io/ioutil"
"sort"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore"
Expand Down Expand Up @@ -123,6 +122,10 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io
return ioutil.NopCloser(bytes.NewReader(file[off:])), nil
}

if length <= 0 {
return ioutil.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller than 0")
}

if int64(len(file)) <= off+length {
// Just return maximum of what we have.
length = int64(len(file)) - off
Expand Down
47 changes: 33 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (

maxChunkSize = 16000

maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
// Now with newer Store Gateway advertising all the external labels it has access to, there was simple case where
Expand Down Expand Up @@ -87,6 +89,7 @@ type bucketStoreMetrics struct {
chunkSizeBytes prometheus.Histogram
queriesDropped prometheus.Counter
queriesLimit prometheus.Gauge
seriesRefetches prometheus.Counter
}

func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Expand Down Expand Up @@ -166,6 +169,10 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
Name: "thanos_bucket_store_queries_concurrent_max",
Help: "Number of maximum concurrent queries.",
})
m.seriesRefetches = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
})

if reg != nil {
reg.MustRegister(
Expand All @@ -185,6 +192,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
m.chunkSizeBytes,
m.queriesDropped,
m.queriesLimit,
m.seriesRefetches,
)
}
return &m
Expand Down Expand Up @@ -454,6 +462,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.chunkPool,
jr,
s.partitioner,
s.metrics.seriesRefetches,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -601,8 +610,6 @@ func (s *bucketSeriesSet) Err() error {
}

func blockSeries(
ctx context.Context,
ulid ulid.ULID,
extLset map[string]string,
indexr *bucketIndexReader,
chunkr *bucketChunkReader,
Expand Down Expand Up @@ -845,8 +852,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer runutil.CloseWithLogOnErr(s.logger, chunkr, "series block")

g.Go(func() error {
part, pstats, err := blockSeries(ctx,
b.meta.ULID,
part, pstats, err := blockSeries(
b.meta.Thanos.Labels,
indexr,
chunkr,
Expand Down Expand Up @@ -1156,6 +1162,8 @@ type bucketBlock struct {
pendingReaders sync.WaitGroup

partitioner partitioner

seriesRefetches prometheus.Counter
}

func newBucketBlock(
Expand All @@ -1168,6 +1176,7 @@ func newBucketBlock(
chunkPool *pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
seriesRefetches prometheus.Counter,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
Expand All @@ -1178,6 +1187,7 @@ func newBucketBlock(
partitioner: p,
meta: meta,
indexHeaderReader: indexHeadReader,
seriesRefetches: seriesRefetches,
}

// Get object handles for all chunk files.
Expand Down Expand Up @@ -1484,12 +1494,11 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
fetchTime := time.Since(begin)

r.mtx.Lock()
defer r.mtx.Unlock()

r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.postingsFetchDurationSum += fetchTime
r.stats.postingsFetchedSizeSum += int(length)
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
c := b[p.ptr.Start-start : p.ptr.End-start]
Expand All @@ -1499,13 +1508,15 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
return errors.Wrap(err, "read postings list")
}

r.mtx.Lock()
// Return postings and fill LRU cache.
groups[p.groupID].Fill(p.keyID, fetchedPostings)
r.cache.StorePostings(r.ctx, r.block.meta.ULID, groups[p.groupID].keys[p.keyID], c)

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(c)
r.mtx.Unlock()
}
return nil
})
Expand All @@ -1515,8 +1526,6 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error {
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
const maxSeriesSize = 64 * 1024

// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
fromCache, ids := r.cache.FetchMultiSeries(r.ctx, r.block.meta.ULID, ids)
Expand All @@ -1533,13 +1542,13 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
i, j := p.elemRng[0], p.elemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], s, e)
return r.loadSeries(ctx, ids[i:j], false, s, e)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start, end uint64) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, refetch bool, start, end uint64) error {
begin := time.Now()

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
Expand All @@ -1548,26 +1557,36 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []uint64, start,
}

r.mtx.Lock()
defer r.mtx.Unlock()

r.stats.seriesFetchCount++
r.stats.seriesFetched += len(ids)
r.stats.seriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
r.mtx.Unlock()

for _, id := range ids {
for i, id := range ids {
c := b[id-start:]

l, n := binary.Uvarint(c)
if n < 1 {
return errors.New("reading series length failed")
}
if len(c) < n+int(l) {
return errors.Errorf("invalid remaining size %d, expected %d", len(c), n+int(l))
if i == 0 && refetch {
return errors.Errorf("invalid remaining size, even after refetch, remaining: %d, expected %d", len(c), n+int(l))
}

// Inefficient, but should be rare.
r.block.seriesRefetches.Inc()
level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, id, id+uint64(n+int(l)+1))
}
c = c[n : n+int(l)]
r.mtx.Lock()
r.loadedSeries[id] = c
r.cache.StoreSeries(r.ctx, r.block.meta.ULID, id, c)
r.mtx.Unlock()
}
return nil
}
Expand Down
75 changes: 75 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -20,8 +21,11 @@ import (
"github.com/leanovate/gopter/gen"
"github.com/leanovate/gopter/prop"
"github.com/oklog/ulid"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -760,3 +764,74 @@ func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []uli
sort.Strings(ops)
return ops
}

// Regression tests against: https://github.com/thanos-io/thanos/issues/1983
func TestReadIndexCache_LoadSeries(t *testing.T) {
bkt := inmem.NewBucket()

s := newBucketStoreMetrics(nil)
b := &bucketBlock{
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: ulid.MustNew(1, nil)},
},
bucket: bkt,
seriesRefetches: s.seriesRefetches,
logger: log.NewNopLogger(),
}

buf := encoding.Encbuf{}
buf.PutByte(0)
buf.PutByte(0)
buf.PutUvarint(10)
buf.PutString("aaaaaaaaaa")
buf.PutUvarint(10)
buf.PutString("bbbbbbbbbb")
buf.PutUvarint(10)
buf.PutString("cccccccccc")
testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

r := bucketIndexReader{
block: b,
stats: &queryStats{},
loadedSeries: map[uint64][]byte{},
cache: noopCache{},
logger: log.NewNopLogger(),
}

// Success with no refetches.
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 100))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, r.loadedSeries)
testutil.Equals(t, float64(0), promtest.ToFloat64(s.seriesRefetches))

// Success with 2 refetches.
r.loadedSeries = map[uint64][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 2, 15))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
13: []byte("bbbbbbbbbb"),
24: []byte("cccccccccc"),
}, r.loadedSeries)
testutil.Equals(t, float64(2), promtest.ToFloat64(s.seriesRefetches))

// Success with refetch on first element.
r.loadedSeries = map[uint64][]byte{}
testutil.Ok(t, r.loadSeries(context.TODO(), []uint64{2}, false, 2, 5))
testutil.Equals(t, map[uint64][]byte{
2: []byte("aaaaaaaaaa"),
}, r.loadedSeries)
testutil.Equals(t, float64(3), promtest.ToFloat64(s.seriesRefetches))

buf.Reset()
buf.PutByte(0)
buf.PutByte(0)
buf.PutUvarint(10)
buf.PutString("aaaaaaa")
testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get())))

// Fail, but no recursion at least.
testutil.NotOk(t, r.loadSeries(context.TODO(), []uint64{2, 13, 24}, false, 1, 15))
}

0 comments on commit 43e3f12

Please sign in to comment.