diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index eadcf9340f..78c225537b 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -311,6 +311,7 @@ func runStore( chunkPoolSizeBytes, store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. store.NewSeriesLimiterFactory(maxSeriesCount), + store.NewGapBasedPartitioner(store.PartitionerMaxGapSize), verbose, blockSyncConcurrency, filterConf, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 26b02ab8ec..013dcddfa3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -82,7 +82,7 @@ const ( // not too small (too much memory). DefaultPostingOffsetInMemorySampling = 32 - partitionerMaxGapSize = 512 * 1024 + PartitionerMaxGapSize = 512 * 1024 // Labels for metrics. labelEncode = "encode" @@ -278,7 +278,7 @@ type BucketStore struct { chunksLimiterFactory ChunksLimiterFactory // seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call. seriesLimiterFactory SeriesLimiterFactory - partitioner partitioner + partitioner Partitioner filterConfig *FilterConfig advLabelSets []labelpb.ZLabelSet @@ -303,6 +303,7 @@ func NewBucketStore( maxChunkPoolBytes uint64, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, + partitioner Partitioner, debugLogging bool, blockSyncConcurrency int, filterConfig *FilterConfig, @@ -337,7 +338,7 @@ func NewBucketStore( queryGate: queryGate, chunksLimiterFactory: chunksLimiterFactory, seriesLimiterFactory: seriesLimiterFactory, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: partitioner, enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, @@ -1383,7 +1384,7 @@ type bucketBlock struct { pendingReaders sync.WaitGroup - partitioner partitioner + partitioner Partitioner // Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using // request hints' BlockMatchers. @@ -1400,7 +1401,7 @@ func newBucketBlock( indexCache storecache.IndexCache, chunkPool pool.BytesPool, indexHeadReader indexheader.Reader, - p partitioner, + p Partitioner, ) (b *bucketBlock, err error) { b = &bucketBlock{ logger: logger, @@ -2039,7 +2040,7 @@ type part struct { elemRng [2]int } -type partitioner interface { +type Partitioner interface { // Partition partitions length entries into n <= length ranges that cover all // input ranges // It supports overlapping ranges. @@ -2051,6 +2052,12 @@ type gapBasedPartitioner struct { maxGapSize uint64 } +func NewGapBasedPartitioner(maxGapSize uint64) Partitioner { + return gapBasedPartitioner{ + maxGapSize: maxGapSize, + } +} + // Partition partitions length entries into n <= length ranges that cover all // input ranges by combining entries that are separated by reasonably small gaps. // It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones. diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 6626fb41a8..2b8a3cc102 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -165,6 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m 0, NewChunksLimiterFactory(maxChunksLimit), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, filterConf, diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index ac9a33ffcb..609a55724a 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -576,6 +576,7 @@ func TestBucketStore_Info(t *testing.T) { 2e5, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, allowAllFilterConf, @@ -828,6 +829,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul 0, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 20, allowAllFilterConf, @@ -1140,7 +1142,7 @@ func benchmarkExpandedPostings( indexCache: noopCache{}, bkt: bkt, meta: &metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: id}}, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), } indexr := newBucketIndexReader(context.Background(), b) @@ -1256,7 +1258,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer metrics: m, bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, extLset: extLset, @@ -1430,7 +1432,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } @@ -1469,7 +1471,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { metrics: newBucketStoreMetrics(nil), bkt: bkt, meta: meta, - partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize}, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, chunkPool: chunkPool, } @@ -1647,6 +1649,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -1741,6 +1744,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { 1000000, NewChunksLimiterFactory(100000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -1886,6 +1890,7 @@ func TestBlockWithLargeChunks(t *testing.T) { 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil, @@ -2047,6 +2052,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb 1000000, NewChunksLimiterFactory(10000/MaxSamplesPerChunk), NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), false, 10, nil,