From a400ef3d965a86f918adbb24c89ff85f393159fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 16:27:12 +0200 Subject: [PATCH 1/8] Added unit test showing issue with large chunk in the block. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 117 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 951b83f732..f317f19b9c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1729,3 +1729,120 @@ func TestBigEndianPostingsCount(t *testing.T) { } testutil.Equals(t, count, c) } + +func TestBlockWithLargeChunks(t *testing.T) { + tb := testutil.NewTB(t) + + tmpDir, err := ioutil.TempDir(os.TempDir(), "large-chunk-test") + testutil.Ok(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(tmpDir) + }) + + blockDir := filepath.Join(tmpDir, "block") + b := createBlockWithLargeChunk(t, tb, blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0)), err) + + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, b.String()), thanosMeta, nil) + testutil.Ok(t, err) + + bucketDir := filepath.Join(os.TempDir(), "bkt") + bkt, err := filesystem.NewBucket(bucketDir) + testutil.Ok(t, err) + t.Cleanup(func() { + _ = os.RemoveAll(bucketDir) + }) + + logger := log.NewNopLogger() + instrBkt := objstore.WithNoopInstr(bkt) + + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String()))) + + // Instance a real bucket store we'll use to query the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) + testutil.Ok(tb, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(tb, err) + + store, err := NewBucketStore( + logger, + nil, + instrBkt, + fetcher, + tmpDir, + indexCache, + nil, + 1000000, + NewChunksLimiterFactory(10000/MaxSamplesPerChunk), + false, + 10, + nil, + false, + true, + DefaultPostingOffsetInMemorySampling, + true, + ) + testutil.Ok(tb, err) + testutil.Ok(tb, store.SyncBlocks(context.Background())) + + req := &storepb.SeriesRequest{ + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"}, + }, + } + srv := newStoreSeriesServer(context.Background()) + testutil.Ok(t, store.Series(req, srv)) +} + +// This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk. +// If compactor is fixed in the future, we may need a different way of generating the block, or commit +// existing block to the repository. +func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls labels.Labels, random *rand.Rand, err error) ulid.ULID { + // Block covering time [0 ... 10000) + b1 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 10000, random, 1) + + // This block has only 10 samples, but it completely overlaps entire first block. + // This will make compactor to merge all chunks into one. + b2 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 11, random, 1000) + + // Merge the blocks together. + compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) + testutil.Ok(t, err) + + blocksToCompact := []string{filepath.Join(dir, b1.String()), filepath.Join(dir, b2.String())} + newBlock, err := compactor.Compact(dir, blocksToCompact, nil) + testutil.Ok(t, err) + + for _, b := range blocksToCompact { + err := os.RemoveAll(b) + testutil.Ok(t, err) + } + + return newBlock +} + +func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand, step int64) ulid.ULID { + h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)*step, dir, nil, tsdb.DefaultStripeSize, nil) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, h.Close()) }() + + app := h.Appender() + + ts := int64(blockIndex * totalSamples) + ref, err := app.Add(lbls, ts, random.Float64()) + testutil.Ok(t, err) + for i := 1; i < totalSamples; i++ { + testutil.Ok(t, app.AddFast(ref, ts+step*int64(i), random.Float64())) + } + testutil.Ok(t, app.Commit()) + + return createBlockFromHead(t, dir, h) +} From a6ee935eabedfe3ba39497250269fc7d479206ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 16:48:13 +0200 Subject: [PATCH 2/8] Make sure that large chunk is indeed created. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f317f19b9c..06c37534f6 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -1809,7 +1810,8 @@ func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls la // Block covering time [0 ... 10000) b1 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 10000, random, 1) - // This block has only 10 samples, but it completely overlaps entire first block. + // This block has only 11 samples that fit into one chunk, but it completely overlaps entire first block. + // Last sample has higher timestamp than last sample in b1. // This will make compactor to merge all chunks into one. b2 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 11, random, 1000) @@ -1826,6 +1828,19 @@ func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls la testutil.Ok(t, err) } + db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions()) + testutil.Ok(t, err) + bs := db.Blocks() + testutil.Equals(t, 1, len(bs)) + cr, err := bs[0].Chunks() + testutil.Ok(t, err) + // Ref is ( << 32 + offset in the file). In TSDB v1 first chunk is always at offset 8. + c, err := cr.Chunk(8) + testutil.Ok(t, err) + + // Make sure that this is really a big chunk, otherwise this method makes a false promise. + testutil.Equals(t, 10001, c.NumSamples()) + return newBlock } From 170fdc4fdcd2f3c7dd15d59213b6d51dcd69519e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 17:25:40 +0200 Subject: [PATCH 3/8] If chunk is not fully fetched, fetch remaining data. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 55 +++++++++++++++++++++++++++++++++++----- pkg/store/bucket_test.go | 1 + 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fcba1760f0..5584a91158 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2026,21 +2026,29 @@ func (r *bucketChunkReader) preload() error { return g.Wait() } +// loadChunks will read range [start, end] from the segment file with sequence number seq. +// This data range covers chunks starting at supplied offsets. func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error { - begin := time.Now() + fetchBegin := time.Now() b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start)) if err != nil { return errors.Wrapf(err, "read range for %d", seq) } + locked := true r.mtx.Lock() - defer r.mtx.Unlock() + + defer func() { + if locked { + r.mtx.Unlock() + } + }() r.chunkBytes = append(r.chunkBytes, b) r.stats.chunksFetchCount++ r.stats.chunksFetched += len(offs) - r.stats.chunksFetchDurationSum += time.Since(begin) + r.stats.chunksFetchDurationSum += time.Since(fetchBegin) r.stats.chunksFetchedSizeSum += int(end - start) for _, o := range offs { @@ -2050,11 +2058,44 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i if n < 1 { return errors.New("reading chunk length failed") } - if len(cb) < n+int(l)+1 { - return errors.Errorf("preloaded chunk too small, expecting %d", n+int(l)+1) + + chunkRef := uint64(seq<<32) | uint64(o) + + // Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data. + // There is also crc32 after the chunk, but we ignore that. + chLen := n + 1 + int(l) + if len(cb) >= chLen { + r.chunks[chunkRef] = rawChunk(cb[n:chLen]) + continue } - cid := uint64(seq<<32) | uint64(o) - r.chunks[cid] = rawChunk(cb[n : n+int(l)+1]) + + // If we didn't fetch enough data for the chunk, fetch more. This can only really happen for last + // chunk in the list of fetched chunks, otherwise partitioner would merge fetch ranges together. + r.mtx.Unlock() + locked = false + + fetchBegin = time.Now() + + // Read entire chunk into new buffer. + nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen)) + if err != nil { + return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen) + } + + cb = *nb + if len(cb) != chLen { + return errors.Errorf("preloaded chunk too small, expecting %d", chLen) + } + + r.mtx.Lock() + locked = true + + r.chunkBytes = append(r.chunkBytes, nb) + r.stats.chunksFetchCount++ + r.stats.chunksFetchDurationSum += time.Since(fetchBegin) + r.stats.chunksFetchedSizeSum += len(cb) + + r.chunks[chunkRef] = rawChunk(cb[n:]) } return nil } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 06c37534f6..68400e0ee5 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1801,6 +1801,7 @@ func TestBlockWithLargeChunks(t *testing.T) { } srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, store.Series(req, srv)) + testutil.Equals(t, 1, len(srv.SeriesSet)) } // This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk. From 4e46562f7abe783e893e1c6655388b9a39c4ab5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 17:31:29 +0200 Subject: [PATCH 4/8] Added CHANGELOG.md entry. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9427884e4c..c70e18003b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks. - [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block. - [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified. +- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes. ### Added From 9f8973305f4a7ca5282d2903a7b2021231943bd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 17:38:11 +0200 Subject: [PATCH 5/8] Removed extra err parameter. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 68400e0ee5..87124103c7 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1741,7 +1741,7 @@ func TestBlockWithLargeChunks(t *testing.T) { }) blockDir := filepath.Join(tmpDir, "block") - b := createBlockWithLargeChunk(t, tb, blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0)), err) + b := createBlockWithLargeChunk(t, tb, blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0))) thanosMeta := metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), @@ -1807,7 +1807,7 @@ func TestBlockWithLargeChunks(t *testing.T) { // This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk. // If compactor is fixed in the future, we may need a different way of generating the block, or commit // existing block to the repository. -func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls labels.Labels, random *rand.Rand, err error) ulid.ULID { +func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID { // Block covering time [0 ... 10000) b1 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 10000, random, 1) From 444734cf6fceda134cb973bc907494c617df461c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 17:38:57 +0200 Subject: [PATCH 6/8] Removed extra t. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 87124103c7..df641beac6 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1732,8 +1732,6 @@ func TestBigEndianPostingsCount(t *testing.T) { } func TestBlockWithLargeChunks(t *testing.T) { - tb := testutil.NewTB(t) - tmpDir, err := ioutil.TempDir(os.TempDir(), "large-chunk-test") testutil.Ok(t, err) t.Cleanup(func() { @@ -1741,7 +1739,7 @@ func TestBlockWithLargeChunks(t *testing.T) { }) blockDir := filepath.Join(tmpDir, "block") - b := createBlockWithLargeChunk(t, tb, blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0))) + b := createBlockWithLargeChunk(testutil.NewTB(t), blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0))) thanosMeta := metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), @@ -1807,14 +1805,14 @@ func TestBlockWithLargeChunks(t *testing.T) { // This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk. // If compactor is fixed in the future, we may need a different way of generating the block, or commit // existing block to the repository. -func createBlockWithLargeChunk(t *testing.T, tb testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID { +func createBlockWithLargeChunk(t testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID { // Block covering time [0 ... 10000) - b1 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 10000, random, 1) + b1 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 10000, random, 1) // This block has only 11 samples that fit into one chunk, but it completely overlaps entire first block. // Last sample has higher timestamp than last sample in b1. // This will make compactor to merge all chunks into one. - b2 := createBlockWithOneSeriesWithStep(tb, dir, lbls, 0, 11, random, 1000) + b2 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 11, random, 1000) // Merge the blocks together. compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil) From 64a1cb4ab800c2424ac35da07f0827d933995003 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 29 Jul 2020 17:39:27 +0200 Subject: [PATCH 7/8] Removed extra tb. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index df641beac6..0d39b16d50 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1764,10 +1764,10 @@ func TestBlockWithLargeChunks(t *testing.T) { // Instance a real bucket store we'll use to query the series. fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil) - testutil.Ok(tb, err) + testutil.Ok(t, err) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) - testutil.Ok(tb, err) + testutil.Ok(t, err) store, err := NewBucketStore( logger, @@ -1787,8 +1787,8 @@ func TestBlockWithLargeChunks(t *testing.T) { DefaultPostingOffsetInMemorySampling, true, ) - testutil.Ok(tb, err) - testutil.Ok(tb, store.SyncBlocks(context.Background())) + testutil.Ok(t, err) + testutil.Ok(t, store.SyncBlocks(context.Background())) req := &storepb.SeriesRequest{ MinTime: math.MinInt64, From f690af7d6988aa874eb8e8099a5b9121b21b7145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 30 Jul 2020 17:21:07 +0200 Subject: [PATCH 8/8] Reorder bucketChunkReader to show which fields mutex protects. Also document mutex usage. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/store/bucket.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 5584a91158..f511149b0b 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1966,14 +1966,15 @@ func (r *bucketIndexReader) Close() error { type bucketChunkReader struct { ctx context.Context block *bucketBlock - stats *queryStats preloads [][]uint32 - mtx sync.Mutex - chunks map[uint64]chunkenc.Chunk - // Byte slice to return to the chunk pool on close. - chunkBytes []*[]byte + // Mutex protects access to following fields, when updated from chunks-loading goroutines. + // After chunks are loaded, mutex is no longer used. + mtx sync.Mutex + chunks map[uint64]chunkenc.Chunk + stats *queryStats + chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. } func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader {