Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetching large chunks #2956

Merged
merged 8 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks.
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.
- [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified.
- [#2956](https://github.com/thanos-io/thanos/pull/2956) Store: Fix fetching of chunks bigger than 16000 bytes.

### Added

Expand Down
66 changes: 54 additions & 12 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,14 +1966,15 @@ func (r *bucketIndexReader) Close() error {
type bucketChunkReader struct {
ctx context.Context
block *bucketBlock
stats *queryStats

preloads [][]uint32
mtx sync.Mutex
chunks map[uint64]chunkenc.Chunk

// Byte slice to return to the chunk pool on close.
chunkBytes []*[]byte
// Mutex protects access to following fields, when updated from chunks-loading goroutines.
// After chunks are loaded, mutex is no longer used.
mtx sync.Mutex
chunks map[uint64]chunkenc.Chunk
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.
}

func newBucketChunkReader(ctx context.Context, block *bucketBlock) *bucketChunkReader {
Expand Down Expand Up @@ -2026,21 +2027,29 @@ func (r *bucketChunkReader) preload() error {
return g.Wait()
}

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq int, start, end uint32) error {
begin := time.Now()
fetchBegin := time.Now()

b, err := r.block.readChunkRange(ctx, seq, int64(start), int64(end-start))
if err != nil {
return errors.Wrapf(err, "read range for %d", seq)
}

locked := true
r.mtx.Lock()
defer r.mtx.Unlock()

defer func() {
if locked {
r.mtx.Unlock()
}
}()

r.chunkBytes = append(r.chunkBytes, b)
r.stats.chunksFetchCount++
r.stats.chunksFetched += len(offs)
r.stats.chunksFetchDurationSum += time.Since(begin)
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(end - start)

for _, o := range offs {
Expand All @@ -2050,11 +2059,44 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i
if n < 1 {
return errors.New("reading chunk length failed")
}
if len(cb) < n+int(l)+1 {
return errors.Errorf("preloaded chunk too small, expecting %d", n+int(l)+1)

chunkRef := uint64(seq<<32) | uint64(o)

// Chunk length is n (number of bytes used to encode chunk data), 1 for chunk encoding and l for actual chunk data.
// There is also crc32 after the chunk, but we ignore that.
chLen := n + 1 + int(l)
if len(cb) >= chLen {
r.chunks[chunkRef] = rawChunk(cb[n:chLen])
continue
}

// If we didn't fetch enough data for the chunk, fetch more. This can only really happen for last
// chunk in the list of fetched chunks, otherwise partitioner would merge fetch ranges together.
r.mtx.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth to mention what this mutex is saving from. Probably changing name would be nice. I know it was like this before, but maybe we can improve that part. I assume it's for stats, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's both for stats and storing chunks. I will document and rename it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also chunkBytes.

locked = false

fetchBegin = time.Now()

// Read entire chunk into new buffer.
nb, err := r.block.readChunkRange(ctx, seq, int64(o), int64(chLen))
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chLen)
}
cid := uint64(seq<<32) | uint64(o)
r.chunks[cid] = rawChunk(cb[n : n+int(l)+1])

cb = *nb
if len(cb) != chLen {
return errors.Errorf("preloaded chunk too small, expecting %d", chLen)
}

r.mtx.Lock()
locked = true

r.chunkBytes = append(r.chunkBytes, nb)
r.stats.chunksFetchCount++
r.stats.chunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(cb)

r.chunks[chunkRef] = rawChunk(cb[n:])
}
return nil
}
Expand Down
131 changes: 131 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/encoding"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -1729,3 +1730,133 @@ func TestBigEndianPostingsCount(t *testing.T) {
}
testutil.Equals(t, count, c)
}

func TestBlockWithLargeChunks(t *testing.T) {
tmpDir, err := ioutil.TempDir(os.TempDir(), "large-chunk-test")
testutil.Ok(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(tmpDir)
})

blockDir := filepath.Join(tmpDir, "block")
b := createBlockWithLargeChunk(testutil.NewTB(t), blockDir, labels.FromStrings("__name__", "test"), rand.New(rand.NewSource(0)))

thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, b.String()), thanosMeta, nil)
testutil.Ok(t, err)

bucketDir := filepath.Join(os.TempDir(), "bkt")
bkt, err := filesystem.NewBucket(bucketDir)
testutil.Ok(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(bucketDir)
})

logger := log.NewNopLogger()
instrBkt := objstore.WithNoopInstr(bkt)

testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, b.String())))

// Instance a real bucket store we'll use to query the series.
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil, nil)
testutil.Ok(t, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(t, err)

store, err := NewBucketStore(
logger,
nil,
instrBkt,
fetcher,
tmpDir,
indexCache,
nil,
1000000,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
false,
10,
nil,
false,
true,
DefaultPostingOffsetInMemorySampling,
true,
)
testutil.Ok(t, err)
testutil.Ok(t, store.SyncBlocks(context.Background()))

req := &storepb.SeriesRequest{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"},
},
}
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(req, srv))
testutil.Equals(t, 1, len(srv.SeriesSet))
}

// This method relies on a bug in TSDB Compactor which will just merge overlapping chunks into one big chunk.
// If compactor is fixed in the future, we may need a different way of generating the block, or commit
// existing block to the repository.
func createBlockWithLargeChunk(t testutil.TB, dir string, lbls labels.Labels, random *rand.Rand) ulid.ULID {
// Block covering time [0 ... 10000)
b1 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 10000, random, 1)

// This block has only 11 samples that fit into one chunk, but it completely overlaps entire first block.
// Last sample has higher timestamp than last sample in b1.
// This will make compactor to merge all chunks into one.
b2 := createBlockWithOneSeriesWithStep(t, dir, lbls, 0, 11, random, 1000)

// Merge the blocks together.
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil)
testutil.Ok(t, err)

blocksToCompact := []string{filepath.Join(dir, b1.String()), filepath.Join(dir, b2.String())}
newBlock, err := compactor.Compact(dir, blocksToCompact, nil)
testutil.Ok(t, err)

for _, b := range blocksToCompact {
err := os.RemoveAll(b)
testutil.Ok(t, err)
}

db, err := tsdb.Open(dir, nil, nil, tsdb.DefaultOptions())
testutil.Ok(t, err)
bs := db.Blocks()
testutil.Equals(t, 1, len(bs))
cr, err := bs[0].Chunks()
testutil.Ok(t, err)
// Ref is (<segment file index> << 32 + offset in the file). In TSDB v1 first chunk is always at offset 8.
c, err := cr.Chunk(8)
testutil.Ok(t, err)

// Make sure that this is really a big chunk, otherwise this method makes a false promise.
testutil.Equals(t, 10001, c.NumSamples())

return newBlock
}

func createBlockWithOneSeriesWithStep(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand, step int64) ulid.ULID {
h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples)*step, dir, nil, tsdb.DefaultStripeSize, nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, h.Close()) }()

app := h.Appender()

ts := int64(blockIndex * totalSamples)
ref, err := app.Add(lbls, ts, random.Float64())
testutil.Ok(t, err)
for i := 1; i < totalSamples; i++ {
testutil.Ok(t, app.AddFast(ref, ts+step*int64(i), random.Float64()))
}
testutil.Ok(t, app.Commit())

return createBlockFromHead(t, dir, h)
}