Skip to content

Commit

Permalink
store: Cleaned up API for test/benchmark purposes. (#3650)
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Feb 26, 2021
1 parent 0d86697 commit 9001546
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 141 deletions.
2 changes: 1 addition & 1 deletion pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
}

step, apiErr := qapi.parseStep(r, qapi.defaultRangeQueryStep, int64(end.Sub(start)/time.Second))
if err != nil {
if apiErr != nil {
return nil, nil, apiErr
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, "", nil, nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
Expand Down
29 changes: 21 additions & 8 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ import (
"github.com/pkg/errors"
)

type BytesPool interface {
// Bytes is a pool of bytes that can be reused.
type Bytes interface {
// Get returns a new byte slices that fits the given size.
Get(sz int) (*[]byte, error)
// Put returns a byte slice to the right bucket in the pool.
Put(b *[]byte)
}

// BucketedBytesPool is a bucketed pool for variably sized byte slices. It can be configured to not allow
// NoopBytes is pool that always allocated required slice on heap and ignore puts.
type NoopBytes struct{}

func (p NoopBytes) Get(sz int) (*[]byte, error) {
b := make([]byte, 0, sz)
return &b, nil
}

func (p NoopBytes) Put(*[]byte) {}

// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow
// more than a maximum number of bytes being used at a given time.
// Every byte slice obtained from the pool must be returned.
type BucketedBytesPool struct {
type BucketedBytes struct {
buckets []sync.Pool
sizes []int
maxTotal uint64
Expand All @@ -27,10 +40,10 @@ type BucketedBytesPool struct {
new func(s int) *[]byte
}

// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize
// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize
// increasing by the given factor and maximum number of used bytes.
// No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0.
func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytesPool, error) {
func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) {
if minSize < 1 {
return nil, errors.New("invalid minimum pool size")
}
Expand All @@ -46,7 +59,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BucketedBytesPool{
p := &BucketedBytes{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
maxTotal: maxTotal,
Expand All @@ -62,7 +75,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slice that fits the given size.
func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -89,7 +102,7 @@ func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
}

// Put returns a byte slice to the right bucket in the pool.
func (p *BucketedBytesPool) Put(b *[]byte) {
func (p *BucketedBytes) Put(b *[]byte) {
if b == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestMain(m *testing.M) {
}

func TestBytesPool(t *testing.T) {
chunkPool, err := NewBucketedBytesPool(10, 100, 2, 1000)
chunkPool, err := NewBucketedBytes(10, 100, 2, 1000)
testutil.Ok(t, err)

testutil.Equals(t, []int{10, 20, 40, 80}, chunkPool.sizes)
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBytesPool(t *testing.T) {
}

func TestRacePutGet(t *testing.T) {
chunkPool, err := NewBucketedBytesPool(3, 100, 2, 5000)
chunkPool, err := NewBucketedBytes(3, 100, 2, 5000)
testutil.Ok(t, err)

s := sync.WaitGroup{}
Expand Down
23 changes: 0 additions & 23 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
Expand All @@ -24,28 +23,6 @@ func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }

func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
Expand Down
34 changes: 34 additions & 0 deletions pkg/query/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

var (
Expand Down Expand Up @@ -619,3 +622,34 @@ type clearCmd struct{}
func (cmd clearCmd) String() string {
return "clear"
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func NewInProcessClient(t testing.TB, name string, client storepb.StoreClient, extLset labels.Labels) store.Client {
return inProcessClient{
t: t,
name: name,
StoreClient: client,
extLset: extLset,
}
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }
50 changes: 40 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ const (
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
MaxSamplesPerChunk = 120
maxChunkSize = 16000
maxSeriesSize = 64 * 1024
// EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases.
EstimatedMaxChunkSize = 16000
maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
Expand Down Expand Up @@ -258,7 +259,7 @@ type BucketStore struct {
dir string
indexCache storecache.IndexCache
indexReaderPool *indexheader.ReaderPool
chunkPool pool.BytesPool
chunkPool pool.Bytes

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
mtx sync.RWMutex
Expand All @@ -283,14 +284,33 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

// Enables hints in the Series() response.
enableSeriesResponseHints bool
}

type noopCache struct{}

func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(context.Context, ulid.ULID, uint64, []byte) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return map[uint64][]byte{}, ids
}

type noopGate struct{}

func (noopGate) Start(context.Context) error { return nil }
func (noopGate) Done() {}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
// TODO(bwplotka): Move to config at this point.
func NewBucketStore(
logger log.Logger,
reg prometheus.Registerer,
Expand All @@ -299,7 +319,7 @@ func NewBucketStore(
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
chunkPool pool.BytesPool,
chunkPool pool.Bytes,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
Expand All @@ -316,6 +336,16 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

if chunkPool == nil {
chunkPool = pool.NoopBytes{}
}
if indexCache == nil {
indexCache = noopCache{}
}
if queryGate == nil {
queryGate = noopGate{}
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand Down Expand Up @@ -1369,7 +1399,7 @@ type bucketBlock struct {
meta *metadata.Meta
dir string
indexCache storecache.IndexCache
chunkPool pool.BytesPool
chunkPool pool.Bytes
extLset labels.Labels

indexHeaderReader indexheader.Reader
Expand All @@ -1393,7 +1423,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
chunkPool pool.Bytes,
indexHeadReader indexheader.Reader,
p Partitioner,
) (b *bucketBlock, err error) {
Expand Down Expand Up @@ -2228,7 +2258,7 @@ func (r *bucketChunkReader) preload() error {
return offsets[i] < offsets[j]
})
parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) {
return uint64(offsets[i]), uint64(offsets[i]) + maxChunkSize
return uint64(offsets[i]), uint64(offsets[i]) + EstimatedMaxChunkSize
})

seq := seq
Expand Down Expand Up @@ -2337,7 +2367,7 @@ func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges {
ranges[idx] = byteRange{
// The byte range offset is required to be relative to the start of the read slice.
offset: int(offsets[idx] - start),
length: maxChunkSize,
length: EstimatedMaxChunkSize,
}

if idx > 0 {
Expand Down Expand Up @@ -2480,6 +2510,6 @@ func (s queryStats) merge(o *queryStats) *queryStats {
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) {
return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) {
return pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes)
}
17 changes: 1 addition & 16 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ var (
}
)

type noopCache struct{}

func (noopCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) {}
func (noopCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return map[uint64][]byte{}, ids
}

type swappableCache struct {
ptr storecache.IndexCache
}
Expand Down Expand Up @@ -154,9 +142,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

store, err := NewBucketStore(
s.logger,
nil,
Expand All @@ -165,7 +150,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
dir,
s.cache,
nil,
chunkPool,
nil,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
Expand Down
Loading

0 comments on commit 9001546

Please sign in to comment.