Skip to content

Commit

Permalink
Fetching large chunks (#2956)
Browse files Browse the repository at this point in the history
* Added unit test showing issue with large chunk in the block.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Make sure that large chunk is indeed created.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* If chunk is not fully fetched, fetch remaining data.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Added CHANGELOG.md entry.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed extra err parameter.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed extra t.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Removed extra tb.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Reorder bucketChunkReader to show which fields mutex protects.
Also document mutex usage.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored Jul 30, 2020
1 parent 2565a7c commit ee52915
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks.
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.
- [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified.
- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes.

### Added

Expand Down
66 changes: 54 additions & 12 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,14 +1966,15 @@ func (r *bucketIndexReader) Close() error {
type bucketChunkReader struct {
ctx context.Context
block *bucketBlock
stats *queryStats

preloads [][]uint32
mtx sync.Mutex
chunks map[uint64]chunkenc.Chunk

// Byte slice to return to the chunk pool on close.
chunkBytes []*[]byte
// Mutex protects access to following fields, when updated from chunks-loading goroutines.
// After chunks are loaded, mutex is no longer used.
mtx sync.Mutex
chunks map[uint64]chunkenc.Chunk
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.
}

func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader {
Expand Down Expand Up @@ -2026,21 +2027,29 @@ func (r *bucketChunkReader) preload() error {
return g.Wait()
}

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error {
begin := time.Now()
fetchBegin := time.Now()

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start))
if err != nil {
return errors.Wrapf(err, "read range for %d", seq)
}

locked := true
r.mtx.Lock()
defer r.mtx.Unlock()

defer func() {
if locked {
r.mtx.Unlock()
}
}()

r.chunkBytes = append(r.chunkBytes, b)
r.stats.chunksFetchCount++
r.stats.chunksFetched += len(offs)
r.stats.chunksFetchDurationSum += time.Since(begin)
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(end - start)

for _, o := range offs {
Expand All @@ -2050,11 +2059,44 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
if n < 1 {
return errors.New("reading chunk length failed")
}
if len(cb) < n+int(l)+1 {
return errors.Errorf("preloaded chunk too small, expecting %d", n+int(l)+1)

chunkRef := uint64(seq<<32) | uint64(o)

// Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data.
// There is also crc32 after the chunk, but we ignore that.
chLen := n + 1 + int(l)
if len(cb) >= chLen {
r.chunks[chunkRef] = rawChunk(cb[n:chLen])
continue
}

// If we didn't fetch enough data for the chunk, fetch more. This can only really happen for last
// chunk in the list of fetched chunks, otherwise partitioner would merge fetch ranges together.
r.mtx.Unlock()
locked = false

fetchBegin = time.Now()

// Read entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen))
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen)
}
cid := uint64(seq<<32) | uint64(o)
r.chunks[cid] = rawChunk(cb[n : n+int(l)+1])

cb = *nb
if len(cb) != chLen {
return errors.Errorf("preloaded chunk too small, expecting %d", chLen)
}

r.mtx.Lock()
locked = true

r.chunkBytes = append(r.chunkBytes, nb)
r.stats.chunksFetchCount++
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(cb)

r.chunks[chunkRef] = rawChunk(cb[n:])
}
return nil
}
Expand Down
131 changes: 131 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1729,3 +1730,133 @@ func TestBigEndianPostingsCount(t *testing.T) {
}
testutil.Equals(t, count, c)
}

func TestBlockWithLargeChunks(t *testing.T) {
tmpDir, err := ioutil.TempDir(os.TempDir(), "large-chunk-test")
testutil.Ok(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(tmpDir)
})

blockDir := filepath.Join(tmpDir, "block")
b := createBlockWithLargeChunk(testutil.NewTB(t), blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0)))

thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, b.String()), thanosMeta, nil)
testutil.Ok(t, err)

bucketDir := filepath.Join(os.TempDir(), "bkt")
bkt, err := filesystem.NewBucket(bucketDir)
testutil.Ok(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(bucketDir)
})

logger := log.NewNopLogger()
instrBkt := objstore.WithNoopInstr(bkt)

testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String())))

// Instance a real bucket store we'll use to query the series.
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil)
testutil.Ok(t, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
instrBkt,
fetcher,
tmpDir,
indexCache,
nil,
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
false,
true,
DefaultPostingOffsetInMemorySampling,
true,
)
testutil.Ok(t, err)
testutil.Ok(t, store.SyncBlocks(context.Background()))

req := &storepb.SeriesRequest{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"},
},
}
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(req, srv))
testutil.Equals(t, 1, len(srv.SeriesSet))
}

// This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk.
// If compactor is fixed in the future, we may need a different way of generating the block, or commit
// existing block to the repository.
func createBlockWithLargeChunk(t testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID {
// Block covering time [0 ... 10000)
b1 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 10000, random, 1)

// This block has only 11 samples that fit into one chunk, but it completely overlaps entire first block.
// Last sample has higher timestamp than last sample in b1.
// This will make compactor to merge all chunks into one.
b2 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 11, random, 1000)

// Merge the blocks together.
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(t, err)

blocksToCompact := []string{filepath.Join(dir, b1.String()), filepath.Join(dir, b2.String())}
newBlock, err := compactor.Compact(dir, blocksToCompact, nil)
testutil.Ok(t, err)

for _, b := range blocksToCompact {
err := os.RemoveAll(b)
testutil.Ok(t, err)
}

db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions())
testutil.Ok(t, err)
bs := db.Blocks()
testutil.Equals(t, 1, len(bs))
cr, err := bs[0].Chunks()
testutil.Ok(t, err)
// Ref is (<segment file index> << 32 + offset in the file). In TSDB v1 first chunk is always at offset 8.
c, err := cr.Chunk(8)
testutil.Ok(t, err)

// Make sure that this is really a big chunk, otherwise this method makes a false promise.
testutil.Equals(t, 10001, c.NumSamples())

return newBlock
}

func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand, step int64) ulid.ULID {
h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)*step, dir, nil, tsdb.DefaultStripeSize, nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, h.Close()) }()

app := h.Appender()

ts := int64(blockIndex * totalSamples)
ref, err := app.Add(lbls, ts, random.Float64())
testutil.Ok(t, err)
for i := 1; i < totalSamples; i++ {
testutil.Ok(t, app.AddFast(ref, ts+step*int64(i), random.Float64()))
}
testutil.Ok(t, app.Commit())

return createBlockFromHead(t, dir, h)
}

0 comments on commit ee52915

Please sign in to comment.