Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store: Add Time & duration based partitioning #1408

Merged
merged 4 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -49,7 +50,18 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default("0000-01-01T00:00:00Z")) Is that lowest min time? What about -9999-12-31T23:59:59Z ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    t, err = time.Parse(time.RFC3339, "-9999-12-31T23:59:59Z")
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println(t)

gives:

parsing time "-9999-12-31T23:59:59Z" as "2006-01-02T15:04:05Z07:00": cannot parse "-9999-12-31T23:59:59Z" as "2006"        

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would set some explicit default value here (for "" value), but we can do it in next PRs (:

Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely need the same logic for Prometheus sidecar.

My point is that we should avoid fetching duplicated data from different stores when it's possible to decrease merging time and memory usage.

  1. if Prom retention = 2h it doesn't mean that 2h is all the data that Prom will return all the time. Usually, you will see more than 2h in response (up to retention x 2). We can avoid data intersection just adding max-time to queries
  2. Second important thing: we would like to have alerts that use more than 2h of data. There are not a lot of such queries, but we have them. It's a good idea to run these queries on local Prom data, so we need to increase retention up to 7d. But increasing retention we're also increasing memory usage (query cache). Also, increasing retention we're increasing network traffic and finally Thanos Query will merge more data (and we're sure that it's the same data).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense to me.

Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
}

return runStore(g,
logger,
reg,
Expand All @@ -69,6 +81,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
&store.FilterConfig{
MinTime: *minTime,
MaxTime: *maxTime,
},
)
}
}
Expand All @@ -94,6 +110,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -135,6 +152,7 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
32 changes: 32 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,37 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only metrics, which happened later
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which happened eariler than
this value. Option can be a constant time in
RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.

```

## Time based partioning

By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.

Thanos Store `--min-time`, `--max-time` flags allows you to shard Thanos Store based on constant time or time duration relative to current time.

For example setting: `--min-time=-6w` & `--max-time==-2w` will make Thanos Store Gateway return metrics that fall within `now - 6 weeks` up to `now - 2 weeks` time range.

Constant time needs to be set in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.

Thanos Store Gateway might not get new blocks immediately, as Time partitioning is partly done in asynchronous block synchronization job, which is by default done every 3 minutes. Additionally some of the Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store might not immediately get newly created & uploaded blocks anyway.

We recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this will improve your resiliency to failures.

Thanos Querier deals with overlapping time series by merging them together.

Filtering is done on a Chunk level, so Thanos Store might still return Samples which are outside of `--min-time` & `--max-time`.
75 changes: 75 additions & 0 deletions pkg/model/timeduration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package model

import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"gopkg.in/alecthomas/kingpin.v2"
)

// TimeOrDurationValue is a custom kingping parser for time in RFC3339
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
// Only one will be set.
type TimeOrDurationValue struct {
Time *time.Time
Dur *model.Duration
}

// Set converts string to TimeOrDurationValue.
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
tdv.Time = &t
return nil
}

// error parsing time, let's try duration.
var minus bool
if s[0] == '-' {
minus = true
s = s[1:]
}
dur, err := model.ParseDuration(s)
if err != nil {
return err
}

if minus {
dur = dur * -1
}
tdv.Dur = &dur
return nil
}

// String returns either tume or duration.
func (tdv *TimeOrDurationValue) String() string {
switch {
case tdv.Time != nil:
return tdv.Time.String()
case tdv.Dur != nil:
return tdv.Dur.String()
}

return "nil"
}

// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp
// if duration is set now+duration is converted to Timestamp.
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
switch {
case tdv.Time != nil:
return timestamp.FromTime(*tdv.Time)
case tdv.Dur != nil:
return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur)))
}

return 0
}

// TimeOrDuration helper for parsing TimeOrDuration with kingpin.
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
value := new(TimeOrDurationValue)
flags.SetValue(value)
return value
}
36 changes: 36 additions & 0 deletions pkg/model/timeduration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package model_test

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/alecthomas/kingpin.v2"
)

func TestTimeOrDurationValue(t *testing.T) {
cmd := kingpin.New("test", "test")

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
Default("9999-12-31T23:59:59Z"))

_, err := cmd.Parse([]string{"--min-time", "10s"})
if err != nil {
t.Fatal(err)
}

testutil.Equals(t, "10s", minTime.String())
testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String())

prevTime := timestamp.FromTime(time.Now())
afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second))

testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.")
testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s")

testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000")
}
95 changes: 83 additions & 12 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -182,6 +183,11 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// FilterConfig is a configuration, which Store uses for filtering metrics.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
Expand All @@ -208,6 +214,8 @@ type BucketStore struct {
// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner

filterConfig *FilterConfig
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -223,6 +231,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
filterConf *FilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -256,6 +265,7 @@ func NewBucketStore(
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
filterConfig: filterConf,
}
s.metrics = metrics

Expand Down Expand Up @@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
if err != nil {
return nil
}

inRange, err := s.isBlockInMinMaxRange(ctx, id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem here is that we hit the issue of partial blocks... Something that #1394 touches.

I thought we actually handle this in store gw properly (blocks without meta are excluded), seems not so not a regression.

In any case we will just warn so 👍

if err != nil {
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
return nil
}

if !inRange {
return nil
}

allIDs[id] = struct{}{}

if b := s.getBlock(id); b != nil {
Expand Down Expand Up @@ -377,6 +398,26 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}

func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
dir := filepath.Join(s.dir, id.String())

err, meta := loadMeta(ctx, s.logger, s.bucket, dir, id)
if err != nil {
return false, err
}

// We check for blocks in configured minTime, maxTime range
povilasv marked this conversation as resolved.
Show resolved Hide resolved
switch {
case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
return false, nil

case meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
return false, nil
}

return true, nil
}

func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand Down Expand Up @@ -468,6 +509,10 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
maxt = b.meta.MaxTime
}
}

mint = s.limitMinTime(mint)
maxt = s.limitMaxTime(maxt)

return mint, maxt
}

Expand All @@ -482,6 +527,26 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}, nil
}

func (s *BucketStore) limitMinTime(mint int64) int64 {
filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) limitMaxTime(maxt int64) int64 {
filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

type seriesEntry struct {
lset []storepb.Label
refs []uint64
Expand Down Expand Up @@ -722,6 +787,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
req.MinTime = s.limitMinTime(req.MinTime)
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
stats = &queryStats{}
g run.Group
Expand Down Expand Up @@ -1091,9 +1159,12 @@ func newBucketBlock(
dir: dir,
partitioner: p,
}
if err = b.loadMeta(ctx, id); err != nil {
err, meta := loadMeta(ctx, logger, bkt, dir, id)
if err != nil {
return nil, errors.Wrap(err, "load meta")
}
b.meta = meta

if err = b.loadIndexCacheFile(ctx); err != nil {
return nil, errors.Wrap(err, "load index cache")
}
Expand All @@ -1116,26 +1187,26 @@ func (b *bucketBlock) indexCacheFilename() string {
return path.Join(b.id.String(), block.IndexCacheFilename)
}

func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error {
func loadMeta(ctx context.Context, logger log.Logger, bucket objstore.BucketReader, dir string, id ulid.ULID) (error, *metadata.Meta) {
// If we haven't seen the block before download the meta.json file.
if _, err := os.Stat(b.dir); os.IsNotExist(err) {
if err := os.MkdirAll(b.dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
if _, err := os.Stat(dir); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir"), nil
}
src := path.Join(id.String(), block.MetaFilename)

if err := objstore.DownloadFile(ctx, b.logger, b.bucket, src, b.dir); err != nil {
return errors.Wrap(err, "download meta.json")
if err := objstore.DownloadFile(ctx, logger, bucket, src, dir); err != nil {
return errors.Wrap(err, "download meta.json"), nil
}
} else if err != nil {
return err
return err, nil
}
meta, err := metadata.Read(b.dir)
meta, err := metadata.Read(dir)
if err != nil {
return errors.Wrap(err, "read meta.json")
return errors.Wrap(err, "read meta.json"), nil
}
b.meta = meta
return nil

return nil, meta
}

func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) {
Expand Down
Loading