diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index e0740c7b66..db2a280c21 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -172,6 +172,8 @@ type BucketStore struct { debugLogging bool // Number of goroutines to use when syncing blocks from object storage. blockSyncConcurrency int + + partitioner partitioner } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -197,6 +199,9 @@ func NewBucketStore( if err != nil { return nil, errors.Wrap(err, "create chunk pool") } + + const maxGapSize = 512 * 1024 + s := &BucketStore{ logger: logger, bucket: bucket, @@ -207,6 +212,7 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, } s.metrics = newBucketStoreMetrics(reg) @@ -353,6 +359,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { dir, s.indexCache, s.chunkPool, + s.partitioner, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -981,6 +988,8 @@ type bucketBlock struct { chunkObjs []string pendingReaders sync.WaitGroup + + partitioner partitioner } func newBucketBlock( @@ -991,14 +1000,16 @@ func newBucketBlock( dir string, indexCache *indexCache, chunkPool *pool.BytesPool, + p partitioner, ) (b *bucketBlock, err error) { b = &bucketBlock{ - logger: logger, - bucket: bkt, - indexObj: path.Join(id.String(), block.IndexFilename), - indexCache: indexCache, - chunkPool: chunkPool, - dir: dir, + logger: logger, + bucket: bkt, + indexObj: path.Join(id.String(), block.IndexFilename), + indexCache: indexCache, + chunkPool: chunkPool, + dir: dir, + partitioner: p, } if err = b.loadMeta(ctx, id); err != nil { return nil, errors.Wrap(err, "load meta") @@ -1243,8 +1254,6 @@ type postingPtr struct { // fetchPostings returns sorted slice of postings that match the selected labels. func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, error) { - const maxGapSize = 512 * 1024 - var ( ptrs []postingPtr postings = make([]index.Postings, 0, len(keys)) @@ -1282,9 +1291,9 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e // TODO(bwplotka): Asses how large in worst case scenario this can be. (e.g fetch for AllPostingsKeys) // Consider sub split if too big. - parts := partitionRanges(len(ptrs), func(i int) (start, end uint64) { + parts := r.block.partitioner.Partition(len(ptrs), func(i int) (start, end uint64) { return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) - }, maxGapSize) + }) var g run.Group for _, p := range parts { @@ -1346,7 +1355,6 @@ func (r *bucketIndexReader) fetchPostings(keys labels.Labels) (index.Postings, e func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { const maxSeriesSize = 64 * 1024 - const maxGapSize = 512 * 1024 var newIDs []uint64 @@ -1359,17 +1367,18 @@ func (r *bucketIndexReader) PreloadSeries(ids []uint64) error { } ids = newIDs - parts := partitionRanges(len(ids), func(i int) (start, end uint64) { + parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { return ids[i], ids[i] + maxSeriesSize - }, maxGapSize) + }) var g run.Group for _, p := range parts { ctx, cancel := context.WithCancel(r.ctx) + s, e := p.start, p.end i, j := p.elemRng[0], p.elemRng[1] g.Add(func() error { - return r.loadSeries(ctx, ids[i:j], p.start, p.end+maxSeriesSize) + return r.loadSeries(ctx, ids[i:j], s, e) }, func(err error) { if err != nil { cancel() @@ -1419,12 +1428,22 @@ type part struct { elemRng [2]int } -// partitionRanges partitions length entries into n <= length ranges that cover all -// input ranges. -// It combines entries that are separated by reasonably small gaps. -// It supports overlapping ranges. -// NOTE: It expects range to be sorted by start time. -func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint64) (parts []part) { +type partitioner interface { + // Partition partitions length entries into n <= length ranges that cover all + // input ranges + // It supports overlapping ranges. + // NOTE: It expects range to be sorted by start time. + Partition(length int, rng func(int) (uint64, uint64)) []part +} + +type gapBasedPartitioner struct { + maxGapSize uint64 +} + +// Partition partitions length entries into n <= length ranges that cover all +// input ranges by combining entries that are separated by reasonably small gaps. +// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones. +func (g gapBasedPartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) { j := 0 k := 0 for k < length { @@ -1438,7 +1457,7 @@ func partitionRanges(length int, rng func(int) (uint64, uint64), maxGapSize uint for ; k < length; k++ { s, e := rng(k) - if p.end+maxGapSize < s { + if p.end+g.maxGapSize < s { break } @@ -1521,7 +1540,6 @@ func (r *bucketChunkReader) addPreload(id uint64) error { // preload all added chunk IDs. Must be called before the first call to Chunk is made. func (r *bucketChunkReader) preload() error { const maxChunkSize = 16000 - const maxGapSize = 512 * 1024 var g run.Group @@ -1529,19 +1547,20 @@ func (r *bucketChunkReader) preload() error { sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] }) - parts := partitionRanges(len(offsets), func(i int) (start, end uint64) { + parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) { return uint64(offsets[i]), uint64(offsets[i]) + maxChunkSize - }, maxGapSize) + }) seq := seq offsets := offsets for _, p := range parts { ctx, cancel := context.WithCancel(r.ctx) + s, e := uint32(p.start), uint32(p.end) m, n := p.elemRng[0], p.elemRng[1] g.Add(func() error { - return r.loadChunks(ctx, offsets[m:n], seq, uint32(p.start), uint32(p.end)+maxChunkSize) + return r.loadChunks(ctx, offsets[m:n], seq, s, e) }, func(err error) { if err != nil { cancel() @@ -1559,11 +1578,11 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, offs []uint32, seq i if err != nil { return errors.Wrapf(err, "read range for %d", seq) } - r.chunkBytes = append(r.chunkBytes, b) r.mtx.Lock() defer r.mtx.Unlock() + r.chunkBytes = append(r.chunkBytes, b) r.stats.chunksFetchCount++ r.stats.chunksFetched += len(offs) r.stats.chunksFetchDurationSum += time.Since(begin) diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 28122ac367..ca67956644 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -35,7 +35,7 @@ func (s *storeSuite) Close() { s.wg.Wait() } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -92,6 +92,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) * s.store = store + if manyParts { + s.store.partitioner = naivePartitioner{} + } + s.wg.Add(1) go func() { defer s.wg.Done() @@ -117,6 +121,95 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket) * return s } +func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { + mint, maxt := s.store.TimeRange() + testutil.Equals(t, s.minTime, mint) + testutil.Equals(t, s.maxTime, maxt) + + vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{Label: "a"}) + testutil.Ok(t, err) + testutil.Equals(t, []string{"1", "2"}, vals.Values) + + for i, tcase := range []struct { + req *storepb.SeriesRequest + expected [][]storepb.Label + }{ + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "b", Value: "2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + }, + }, + { + // Matching by external label should work as well. + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "value2"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + expected: [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + }, + }, + { + req: &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "wrong-value"}, + }, + MinTime: mint, + MaxTime: maxt, + }, + }, + } { + t.Log("Run ", i) + + // Always clean cache before each test. + s.store.indexCache, err = newIndexCache(nil, 100) + testutil.Ok(t, err) + + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, s.store.Series(tcase.req, srv)) + testutil.Equals(t, len(tcase.expected), len(srv.SeriesSet)) + + for i, s := range srv.SeriesSet { + testutil.Equals(t, tcase.expected[i], s.Labels) + testutil.Equals(t, 3, len(s.Chunks)) + } + } +} + func TestBucketStore_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { ctx, cancel := context.WithCancel(context.Background()) @@ -126,95 +219,38 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt) + s := prepareStoreWithTestBlocks(t, dir, bkt, false) defer s.Close() - mint, maxt := s.store.TimeRange() - testutil.Equals(t, s.minTime, mint) - testutil.Equals(t, s.maxTime, maxt) - - vals, err := s.store.LabelValues(ctx, &storepb.LabelValuesRequest{Label: "a"}) - testutil.Ok(t, err) - testutil.Equals(t, []string{"1", "2"}, vals.Values) - - pbseries := [][]storepb.Label{ - {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, - {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, - {{Name: "a", Value: "2"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "2"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, - {{Name: "a", Value: "2"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, - } - srv := newStoreSeriesServer(ctx) - - testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_RE, Name: "a", Value: "1|2"}, - }, - MinTime: mint, - MaxTime: maxt, - }, srv)) - testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) - - for i, s := range srv.SeriesSet { - testutil.Equals(t, pbseries[i], s.Labels) - testutil.Equals(t, 3, len(s.Chunks)) - } - - pbseries = [][]storepb.Label{ - {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, - } - srv = newStoreSeriesServer(ctx) + testBucketStore_e2e(t, ctx, s) + }) +} - testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "b", Value: "2"}, - }, - MinTime: mint, - MaxTime: maxt, - }, srv)) - testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) +type naivePartitioner struct{} - for i, s := range srv.SeriesSet { - testutil.Equals(t, pbseries[i], s.Labels) - testutil.Equals(t, 3, len(s.Chunks)) - } +func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) (parts []part) { + for i := 0; i < length; i++ { + s, e := rng(i) + parts = append(parts, part{start: s, end: e, elemRng: [2]int{i, i + 1}}) + } + return parts +} - // Matching by external label should work as well. - pbseries = [][]storepb.Label{ - {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, - {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, - } - srv = newStoreSeriesServer(ctx) +// Naive partitioner splits the array equally (it does not combine anything). +// This tests if our, sometimes concurrent, fetches for different parts works. +// Regression test against: https://github.com/improbable-eng/thanos/issues/829 +func TestBucketStore_ManyParts_e2e(t *testing.T) { + objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, - {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "value2"}, - }, - MinTime: mint, - MaxTime: maxt, - }, srv)) - testutil.Equals(t, len(pbseries), len(srv.SeriesSet)) + dir, err := ioutil.TempDir("", "test_bucketstore_e2e") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - for i, s := range srv.SeriesSet { - testutil.Equals(t, pbseries[i], s.Labels) - testutil.Equals(t, 3, len(s.Chunks)) - } + s := prepareStoreWithTestBlocks(t, dir, bkt, true) + defer s.Close() - srv = newStoreSeriesServer(ctx) - testutil.Ok(t, s.store.Series(&storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, - {Type: storepb.LabelMatcher_EQ, Name: "ext2", Value: "wrong-value"}, - }, - MinTime: mint, - MaxTime: maxt, - }, srv)) - testutil.Equals(t, 0, len(srv.SeriesSet)) + testBucketStore_e2e(t, ctx, s) }) - } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 7cc57da4cf..b0d43f23ce 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -214,7 +214,7 @@ func TestBucketBlockSet_labelMatchers(t *testing.T) { } } -func TestPartitionRanges(t *testing.T) { +func TestGapBasedPartitioner_Partition(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() const maxGapSize = 1024 * 512 @@ -267,9 +267,9 @@ func TestPartitionRanges(t *testing.T) { expected: []part{{start: 1, end: maxGapSize + 100, elemRng: [2]int{0, 3}}}, }, } { - res := partitionRanges(len(c.input), func(i int) (uint64, uint64) { + res := gapBasedPartitioner{maxGapSize: maxGapSize}.Partition(len(c.input), func(i int) (uint64, uint64) { return uint64(c.input[i][0]), uint64(c.input[i][1]) - }, maxGapSize) + }) testutil.Equals(t, c.expected, res) } }