Skip to content

Commit

Permalink
Allow to customise the partitioner used by the BucketStore (#3802)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored Feb 15, 2021
1 parent f318232 commit 1311d28
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 10 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func runStore(
chunkPool,
store.NewChunksLimiterFactory(maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(maxSeriesCount),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
19 changes: 13 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
// not too small (too much memory).
DefaultPostingOffsetInMemorySampling = 32

partitionerMaxGapSize = 512 * 1024
PartitionerMaxGapSize = 512 * 1024

// Labels for metrics.
labelEncode = "encode"
Expand Down Expand Up @@ -278,7 +278,7 @@ type BucketStore struct {
chunksLimiterFactory ChunksLimiterFactory
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call.
seriesLimiterFactory SeriesLimiterFactory
partitioner partitioner
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand All @@ -303,6 +303,7 @@ func NewBucketStore(
chunkPool pool.BytesPool,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
debugLogging bool,
blockSyncConcurrency int,
filterConfig *FilterConfig,
Expand Down Expand Up @@ -332,7 +333,7 @@ func NewBucketStore(
queryGate: queryGate,
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: partitioner,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
enableSeriesResponseHints: enableSeriesResponseHints,
Expand Down Expand Up @@ -1378,7 +1379,7 @@ type bucketBlock struct {

pendingReaders sync.WaitGroup

partitioner partitioner
partitioner Partitioner

// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
Expand All @@ -1395,7 +1396,7 @@ func newBucketBlock(
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
indexHeadReader indexheader.Reader,
p partitioner,
p Partitioner,
) (b *bucketBlock, err error) {
b = &bucketBlock{
logger: logger,
Expand Down Expand Up @@ -2034,7 +2035,7 @@ type part struct {
elemRng [2]int
}

type partitioner interface {
type Partitioner interface {
// Partition partitions length entries into n <= length ranges that cover all
// input ranges
// It supports overlapping ranges.
Expand All @@ -2046,6 +2047,12 @@ type gapBasedPartitioner struct {
maxGapSize uint64
}

func NewGapBasedPartitioner(maxGapSize uint64) Partitioner {
return gapBasedPartitioner{
maxGapSize: maxGapSize,
}
}

// Partition partitions length entries into n <= length ranges that cover all
// input ranges by combining entries that are separated by reasonably small gaps.
// It is used to combine multiple small ranges from object storage into bigger, more efficient/cheaper ones.
Expand Down
1 change: 1 addition & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
chunkPool,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
filterConf,
Expand Down
14 changes: 10 additions & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func TestBucketStore_Info(t *testing.T) {
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -834,6 +835,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
chunkPool,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
20,
allowAllFilterConf,
Expand Down Expand Up @@ -1146,7 +1148,7 @@ func benchmarkExpandedPostings(
indexCache: noopCache{},
bkt: bkt,
meta: &metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: id}},
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
}

indexr := newBucketIndexReader(context.Background(), b)
Expand Down Expand Up @@ -1262,7 +1264,7 @@ func benchBucketSeries(t testutil.TB, skipChunk bool, samplesPerSeries, totalSer
metrics: m,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
extLset: extLset,
Expand Down Expand Up @@ -1436,7 +1438,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
Expand Down Expand Up @@ -1475,7 +1477,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
Expand Down Expand Up @@ -1656,6 +1658,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -1753,6 +1756,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
chunkPool,
NewChunksLimiterFactory(100000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -1901,6 +1905,7 @@ func TestBlockWithLargeChunks(t *testing.T) {
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down Expand Up @@ -2065,6 +2070,7 @@ func setupStoreForHintsTest(t *testing.T) (testutil.TB, *BucketStore, []*storepb
chunkPool,
NewChunksLimiterFactory(10000/MaxSamplesPerChunk),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
false,
10,
nil,
Expand Down

0 comments on commit 1311d28

Please sign in to comment.