diff --git a/CHANGELOG.md b/CHANGELOG.md index 9427884e4c..c70e18003b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks. - [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block. - [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified. +- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes. ### Added diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fcba1760f0..f511149b0b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1966,14 +1966,15 @@ func (r *bucketIndexReader) Close() error { type bucketChunkReader struct { ctx context.Context block *bucketBlock - stats *queryStats preloads [][]uint32 - mtx sync.Mutex - chunks map[uint64]chunkenc.Chunk - // Byte slice to return to the chunk pool on close. - chunkBytes []*[]byte + // Mutex protects access to following fields, when updated from chunks-loading goroutines. + // After chunks are loaded, mutex is no longer used. + mtx sync.Mutex + chunks map[uint64]chunkenc.Chunk + stats *queryStats + chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. } func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader { @@ -2026,21 +2027,29 @@ func (r *bucketChunkReader) preload() error { return g.Wait() } +// loadChunks will read range [start, end] from the segment file with sequence number seq. +// This data range covers chunks starting at supplied offsets. func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error { - begin := time.Now() + fetchBegin := time.Now() b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start)) if err != nil { return errors.Wrapf(err, "read range for %d", seq) } + locked := true r.mtx.Lock() - defer r.mtx.Unlock() + + defer func() { + if locked { + r.mtx.Unlock() + } + }() r.chunkBytes = append(r.chunkBytes, b) r.stats.chunksFetchCount++ r.stats.chunksFetched += len(offs) - r.stats.chunksFetchDurationSum += time.Since(begin) + r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += int(end - start) for _, o := range offs { @@ -2050,11 +2059,44 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i if n < 1 { return errors.New("reading chunk length failed") } - if len(cb) < n+int(l)+1 { - return errors.Errorf("preloaded chunk too small, expecting %d", n+int(l)+1) + + chunkRef := uint64(seq<<32) | uint64(o) + + // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data. + // There is also crc32 after the chunk, but we ignore that. + chLen := n + 1 + int(l) + if len(cb) >= chLen { + r.chunks[chunkRef] = rawChunk(cb[n:chLen]) + continue + } + + // If we didn't fetch enough data for the chunk, fetch more. This can only really happen for last + // chunk in the list of fetched chunks, otherwise partitioner would merge fetch ranges together. + r.mtx.Unlock() + locked = false + + fetchBegin = time.Now() + + // Read entire chunk into new buffer. + nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen)) + if err != nil { + return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen) } - cid := uint64(seq<<32) | uint64(o) - r.chunks[cid] = rawChunk(cb[n : n+int(l)+1]) + + cb = *nb + if len(cb) != chLen { + return errors.Errorf("preloaded chunk too small, expecting %d", chLen) + } + + r.mtx.Lock() + locked = true + + r.chunkBytes = append(r.chunkBytes, nb) + r.stats.chunksFetchCount++ + r.stats.chunksFetchDurationSum += time.Since(fetchBegin) + r.stats.chunksFetchedSizeSum += len(cb) + + r.chunks[chunkRef] = rawChunk(cb[n:]) } return nil } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 951b83f732..0d39b16d50 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -1729,3 +1730,133 @@ func TestBigEndianPostingsCount(t *testing.T) { } testutil.Equals(t, count, c) } + +func TestBlockWithLargeChunks(t *testing.T) { + tmpDir, err := ioutil.TempDir(os.TempDir(), "large-chunk-test") + testutil.Ok(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(tmpDir) + }) + + blockDir := filepath.Join(tmpDir, "block") + b := createBlockWithLargeChunk(testutil.NewTB(t), blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0))) + + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, b.String()), thanosMeta, nil) + testutil.Ok(t, err) + + bucketDir := filepath.Join(os.TempDir(), "bkt") + bkt, err := filesystem.NewBucket(bucketDir) + testutil.Ok(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(bucketDir) + }) + + logger := log.NewNopLogger() + instrBkt := objstore.WithNoopInstr(bkt) + + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String()))) + + // Instance a real bucket store we'll use to query the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) + testutil.Ok(t, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(t, err) + + store, err := NewBucketStore( + logger, + nil, + instrBkt, + fetcher, + tmpDir, + indexCache, + nil, + 1000000, + NewChunksLimiterFactory(10000/MaxSamplesPerChunk), + false, + 10, + nil, + false, + true, + DefaultPostingOffsetInMemorySampling, + true, + ) + testutil.Ok(t, err) + testutil.Ok(t, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"}, + }, + } + srv := newStoreSeriesServer(context.Background()) + testutil.Ok(t, store.Series(req, srv)) + testutil.Equals(t, 1, len(srv.SeriesSet)) +} + +// This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk. +// If compactor is fixed in the future, we may need a different way of generating the block, or commit +// existing block to the repository. +func createBlockWithLargeChunk(t testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID { + // Block covering time [0 ... 10000) + b1 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 10000, random, 1) + + // This block has only 11 samples that fit into one chunk, but it completely overlaps entire first block. + // Last sample has higher timestamp than last sample in b1. + // This will make compactor to merge all chunks into one. + b2 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 11, random, 1000) + + // Merge the blocks together. + compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) + testutil.Ok(t, err) + + blocksToCompact := []string{filepath.Join(dir, b1.String()), filepath.Join(dir, b2.String())} + newBlock, err := compactor.Compact(dir, blocksToCompact, nil) + testutil.Ok(t, err) + + for _, b := range blocksToCompact { + err := os.RemoveAll(b) + testutil.Ok(t, err) + } + + db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions()) + testutil.Ok(t, err) + bs := db.Blocks() + testutil.Equals(t, 1, len(bs)) + cr, err := bs[0].Chunks() + testutil.Ok(t, err) + // Ref is ( << 32 + offset in the file). In TSDB v1 first chunk is always at offset 8. + c, err := cr.Chunk(8) + testutil.Ok(t, err) + + // Make sure that this is really a big chunk, otherwise this method makes a false promise. + testutil.Equals(t, 10001, c.NumSamples()) + + return newBlock +} + +func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand, step int64) ulid.ULID { + h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)*step, dir, nil, tsdb.DefaultStripeSize, nil) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, h.Close()) }() + + app := h.Appender() + + ts := int64(blockIndex * totalSamples) + ref, err := app.Add(lbls, ts, random.Float64()) + testutil.Ok(t, err) + for i := 1; i < totalSamples; i++ { + testutil.Ok(t, app.AddFast(ref, ts+step*int64(i), random.Float64())) + } + testutil.Ok(t, app.Commit()) + + return createBlockFromHead(t, dir, h) +}