Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Cleaned up API for test/benchmark purposes. #3650

Merged
merged 1 commit into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
}

step, apiErr := qapi.parseStep(r, qapi.defaultRangeQueryStep, int64(end.Sub(start)/time.Second))
if err != nil {
if apiErr != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing. How did you catch this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Laser eyes

return nil, nil, apiErr
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) (*MetaFetcher, error) {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
return NewMetaFetcher(logger, 1, bkt, "", nil, nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
Expand Down
29 changes: 21 additions & 8 deletions pkg/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,28 @@ import (
"github.com/pkg/errors"
)

type BytesPool interface {
// Bytes is a pool of bytes that can be reused.
type Bytes interface {
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
// Get returns a new byte slices that fits the given size.
Get(sz int) (*[]byte, error)
// Put returns a byte slice to the right bucket in the pool.
Put(b *[]byte)
}

// BucketedBytesPool is a bucketed pool for variably sized byte slices. It can be configured to not allow
// NoopBytes is pool that always allocated required slice on heap and ignore puts.
type NoopBytes struct{}

func (p NoopBytes) Get(sz int) (*[]byte, error) {
b := make([]byte, 0, sz)
return &b, nil
}

func (p NoopBytes) Put(*[]byte) {}

// BucketedBytes is a bucketed pool for variably sized byte slices. It can be configured to not allow
// more than a maximum number of bytes being used at a given time.
// Every byte slice obtained from the pool must be returned.
type BucketedBytesPool struct {
type BucketedBytes struct {
buckets []sync.Pool
sizes []int
maxTotal uint64
Expand All @@ -27,10 +40,10 @@ type BucketedBytesPool struct {
new func(s int) *[]byte
}

// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize
// NewBucketedBytes returns a new Bytes with size buckets for minSize to maxSize
// increasing by the given factor and maximum number of used bytes.
// No more than maxTotal bytes can be used at any given time unless maxTotal is set to 0.
func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytesPool, error) {
func NewBucketedBytes(minSize, maxSize int, factor float64, maxTotal uint64) (*BucketedBytes, error) {
if minSize < 1 {
return nil, errors.New("invalid minimum pool size")
}
Expand All @@ -46,7 +59,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &BucketedBytesPool{
p := &BucketedBytes{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
maxTotal: maxTotal,
Expand All @@ -62,7 +75,7 @@ func NewBucketedBytesPool(minSize, maxSize int, factor float64, maxTotal uint64)
var ErrPoolExhausted = errors.New("pool exhausted")

// Get returns a new byte slice that fits the given size.
func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
func (p *BucketedBytes) Get(sz int) (*[]byte, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

Expand All @@ -89,7 +102,7 @@ func (p *BucketedBytesPool) Get(sz int) (*[]byte, error) {
}

// Put returns a byte slice to the right bucket in the pool.
func (p *BucketedBytesPool) Put(b *[]byte) {
func (p *BucketedBytes) Put(b *[]byte) {
if b == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestMain(m *testing.M) {
}

func TestBytesPool(t *testing.T) {
chunkPool, err := NewBucketedBytesPool(10, 100, 2, 1000)
chunkPool, err := NewBucketedBytes(10, 100, 2, 1000)
testutil.Ok(t, err)

testutil.Equals(t, []int{10, 20, 40, 80}, chunkPool.sizes)
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestBytesPool(t *testing.T) {
}

func TestRacePutGet(t *testing.T) {
chunkPool, err := NewBucketedBytesPool(3, 100, 2, 5000)
chunkPool, err := NewBucketedBytes(3, 100, 2, 5000)
testutil.Ok(t, err)

s := sync.WaitGroup{}
Expand Down
23 changes: 0 additions & 23 deletions pkg/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
Expand All @@ -24,28 +23,6 @@ func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }

func TestQuerier_Proxy(t *testing.T) {
files, err := filepath.Glob("testdata/promql/**/*.test")
testutil.Ok(t, err)
Expand Down
34 changes: 34 additions & 0 deletions pkg/query/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

var (
Expand Down Expand Up @@ -619,3 +622,34 @@ type clearCmd struct{}
func (cmd clearCmd) String() string {
return "clear"
}

type inProcessClient struct {
t testing.TB

name string

storepb.StoreClient
extLset labels.Labels
}

func NewInProcessClient(t testing.TB, name string, client storepb.StoreClient, extLset labels.Labels) store.Client {
return inProcessClient{
t: t,
name: name,
StoreClient: client,
extLset: extLset,
}
}

func (i inProcessClient) LabelSets() []labels.Labels {
return []labels.Labels{i.extLset}
}

func (i inProcessClient) TimeRange() (mint int64, maxt int64) {
r, err := i.Info(context.TODO(), &storepb.InfoRequest{})
testutil.Ok(i.t, err)
return r.MinTime, r.MaxTime
}

func (i inProcessClient) String() string { return i.name }
func (i inProcessClient) Addr() string { return i.name }
50 changes: 40 additions & 10 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ const (
// because you barely get any improvements in compression when the number of samples is beyond this.
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
MaxSamplesPerChunk = 120
maxChunkSize = 16000
maxSeriesSize = 64 * 1024
// EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases.
EstimatedMaxChunkSize = 16000
maxSeriesSize = 64 * 1024

// CompatibilityTypeLabelName is an artificial label that Store Gateway can optionally advertise. This is required for compatibility
// with pre v0.8.0 Querier. Previous Queriers was strict about duplicated external labels of all StoreAPIs that had any labels.
Expand Down Expand Up @@ -258,7 +259,7 @@ type BucketStore struct {
dir string
indexCache storecache.IndexCache
indexReaderPool *indexheader.ReaderPool
chunkPool pool.BytesPool
chunkPool pool.Bytes

// Sets of blocks that have the same labels. They are indexed by a hash over their label set.
mtx sync.RWMutex
Expand All @@ -283,14 +284,33 @@ type BucketStore struct {
advLabelSets []labelpb.ZLabelSet
enableCompatibilityLabel bool

// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

// Enables hints in the Series() response.
enableSeriesResponseHints bool
}

type noopCache struct{}

func (noopCache) StorePostings(context.Context, ulid.ULID, labels.Label, []byte) {}
func (noopCache) FetchMultiPostings(_ context.Context, _ ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(context.Context, ulid.ULID, uint64, []byte) {}
func (noopCache) FetchMultiSeries(_ context.Context, _ ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return map[uint64][]byte{}, ids
}

type noopGate struct{}

func (noopGate) Start(context.Context) error { return nil }
func (noopGate) Done() {}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
// TODO(bwplotka): Move to config at this point.
func NewBucketStore(
logger log.Logger,
reg prometheus.Registerer,
Expand All @@ -299,7 +319,7 @@ func NewBucketStore(
dir string,
indexCache storecache.IndexCache,
queryGate gate.Gate,
chunkPool pool.BytesPool,
chunkPool pool.Bytes,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
partitioner Partitioner,
Expand All @@ -316,6 +336,16 @@ func NewBucketStore(
logger = log.NewNopLogger()
}

if chunkPool == nil {
chunkPool = pool.NoopBytes{}
}
if indexCache == nil {
indexCache = noopCache{}
}
if queryGate == nil {
queryGate = noopGate{}
}

s := &BucketStore{
logger: logger,
bkt: bkt,
Expand Down Expand Up @@ -1369,7 +1399,7 @@ type bucketBlock struct {
meta *metadata.Meta
dir string
indexCache storecache.IndexCache
chunkPool pool.BytesPool
chunkPool pool.Bytes
extLset labels.Labels

indexHeaderReader indexheader.Reader
Expand All @@ -1393,7 +1423,7 @@ func newBucketBlock(
bkt objstore.BucketReader,
dir string,
indexCache storecache.IndexCache,
chunkPool pool.BytesPool,
chunkPool pool.Bytes,
indexHeadReader indexheader.Reader,
p Partitioner,
) (b *bucketBlock, err error) {
Expand Down Expand Up @@ -2228,7 +2258,7 @@ func (r *bucketChunkReader) preload() error {
return offsets[i] < offsets[j]
})
parts := r.block.partitioner.Partition(len(offsets), func(i int) (start, end uint64) {
return uint64(offsets[i]), uint64(offsets[i]) + maxChunkSize
return uint64(offsets[i]), uint64(offsets[i]) + EstimatedMaxChunkSize
})

seq := seq
Expand Down Expand Up @@ -2337,7 +2367,7 @@ func chunkOffsetsToByteRanges(offsets []uint32, start uint32) byteRanges {
ranges[idx] = byteRange{
// The byte range offset is required to be relative to the start of the read slice.
offset: int(offsets[idx] - start),
length: maxChunkSize,
length: EstimatedMaxChunkSize,
}

if idx > 0 {
Expand Down Expand Up @@ -2480,6 +2510,6 @@ func (s queryStats) merge(o *queryStats) *queryStats {
}

// NewDefaultChunkBytesPool returns a chunk bytes pool with default settings.
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.BytesPool, error) {
return pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, maxChunkPoolBytes)
func NewDefaultChunkBytesPool(maxChunkPoolBytes uint64) (pool.Bytes, error) {
return pool.NewBucketedBytes(EstimatedMaxChunkSize, 50e6, 2, maxChunkPoolBytes)
}
17 changes: 1 addition & 16 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,6 @@ var (
}
)

type noopCache struct{}

func (noopCache) StorePostings(ctx context.Context, blockID ulid.ULID, l labels.Label, v []byte) {}
func (noopCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (map[labels.Label][]byte, []labels.Label) {
return map[labels.Label][]byte{}, keys
}

func (noopCache) StoreSeries(ctx context.Context, blockID ulid.ULID, id uint64, v []byte) {}
func (noopCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []uint64) (map[uint64][]byte, []uint64) {
return map[uint64][]byte{}, ids
}

type swappableCache struct {
ptr storecache.IndexCache
}
Expand Down Expand Up @@ -154,9 +142,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
}, nil)
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(0)
testutil.Ok(t, err)

store, err := NewBucketStore(
s.logger,
nil,
Expand All @@ -165,7 +150,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
dir,
s.cache,
nil,
chunkPool,
nil,
NewChunksLimiterFactory(maxChunksLimit),
NewSeriesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
Expand Down
Loading