-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store: Add Time & duration based partitioning #1408
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
opentracing "github.com/opentracing/opentracing-go" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/objstore/client" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
"github.com/thanos-io/thanos/pkg/store" | ||
|
@@ -49,7 +50,18 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string | |
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). | ||
Default("20").Int() | ||
|
||
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
Default("0000-01-01T00:00:00Z")) | ||
|
||
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We definitely need the same logic for Prometheus sidecar. My point is that we should avoid fetching duplicated data from different stores when it's possible to decrease merging time and memory usage.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that makes sense to me. |
||
Default("9999-12-31T23:59:59Z")) | ||
|
||
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { | ||
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { | ||
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", | ||
minTime, maxTime) | ||
} | ||
|
||
return runStore(g, | ||
logger, | ||
reg, | ||
|
@@ -69,6 +81,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string | |
debugLogging, | ||
*syncInterval, | ||
*blockSyncConcurrency, | ||
&store.FilterConfig{ | ||
MinTime: *minTime, | ||
MaxTime: *maxTime, | ||
}, | ||
) | ||
} | ||
} | ||
|
@@ -94,6 +110,7 @@ func runStore( | |
verbose bool, | ||
syncInterval time.Duration, | ||
blockSyncConcurrency int, | ||
filterConf *store.FilterConfig, | ||
) error { | ||
{ | ||
confContentYaml, err := objStoreConfig.Content() | ||
|
@@ -135,6 +152,7 @@ func runStore( | |
maxConcurrent, | ||
verbose, | ||
blockSyncConcurrency, | ||
filterConf, | ||
) | ||
if err != nil { | ||
return errors.Wrap(err, "create object storage store") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package model | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/pkg/timestamp" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
// TimeOrDurationValue is a custom kingping parser for time in RFC3339 | ||
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m". | ||
// Only one will be set. | ||
type TimeOrDurationValue struct { | ||
Time *time.Time | ||
Dur *model.Duration | ||
} | ||
|
||
// Set converts string to TimeOrDurationValue. | ||
func (tdv *TimeOrDurationValue) Set(s string) error { | ||
t, err := time.Parse(time.RFC3339, s) | ||
if err == nil { | ||
tdv.Time = &t | ||
return nil | ||
} | ||
|
||
// error parsing time, let's try duration. | ||
var minus bool | ||
if s[0] == '-' { | ||
minus = true | ||
s = s[1:] | ||
} | ||
dur, err := model.ParseDuration(s) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if minus { | ||
dur = dur * -1 | ||
} | ||
tdv.Dur = &dur | ||
return nil | ||
} | ||
|
||
// String returns either tume or duration. | ||
func (tdv *TimeOrDurationValue) String() string { | ||
switch { | ||
case tdv.Time != nil: | ||
return tdv.Time.String() | ||
case tdv.Dur != nil: | ||
return tdv.Dur.String() | ||
} | ||
|
||
return "nil" | ||
} | ||
|
||
// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp | ||
// if duration is set now+duration is converted to Timestamp. | ||
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 { | ||
switch { | ||
case tdv.Time != nil: | ||
return timestamp.FromTime(*tdv.Time) | ||
case tdv.Dur != nil: | ||
return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur))) | ||
} | ||
|
||
return 0 | ||
} | ||
|
||
// TimeOrDuration helper for parsing TimeOrDuration with kingpin. | ||
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue { | ||
value := new(TimeOrDurationValue) | ||
flags.SetValue(value) | ||
return value | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package model_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/pkg/timestamp" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/testutil" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
func TestTimeOrDurationValue(t *testing.T) { | ||
cmd := kingpin.New("test", "test") | ||
|
||
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve")) | ||
|
||
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve"). | ||
Default("9999-12-31T23:59:59Z")) | ||
|
||
_, err := cmd.Parse([]string{"--min-time", "10s"}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
testutil.Equals(t, "10s", minTime.String()) | ||
testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String()) | ||
|
||
prevTime := timestamp.FromTime(time.Now()) | ||
afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second)) | ||
|
||
testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.") | ||
testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s") | ||
|
||
testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import ( | |
"github.com/thanos-io/thanos/pkg/compact/downsample" | ||
"github.com/thanos-io/thanos/pkg/component" | ||
"github.com/thanos-io/thanos/pkg/extprom" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/objstore" | ||
"github.com/thanos-io/thanos/pkg/pool" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
|
@@ -182,6 +183,11 @@ type indexCache interface { | |
Series(b ulid.ULID, id uint64) ([]byte, bool) | ||
} | ||
|
||
// FilterConfig is a configuration, which Store uses for filtering metrics. | ||
type FilterConfig struct { | ||
MinTime, MaxTime model.TimeOrDurationValue | ||
} | ||
|
||
// BucketStore implements the store API backed by a bucket. It loads all index | ||
// files to local disk. | ||
type BucketStore struct { | ||
|
@@ -208,6 +214,8 @@ type BucketStore struct { | |
// samplesLimiter limits the number of samples per each Series() call. | ||
samplesLimiter *Limiter | ||
partitioner partitioner | ||
|
||
filterConfig *FilterConfig | ||
} | ||
|
||
// NewBucketStore creates a new bucket backed store that implements the store API against | ||
|
@@ -223,6 +231,7 @@ func NewBucketStore( | |
maxConcurrent int, | ||
debugLogging bool, | ||
blockSyncConcurrency int, | ||
filterConf *FilterConfig, | ||
) (*BucketStore, error) { | ||
if logger == nil { | ||
logger = log.NewNopLogger() | ||
|
@@ -256,6 +265,7 @@ func NewBucketStore( | |
), | ||
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), | ||
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, | ||
filterConfig: filterConf, | ||
} | ||
s.metrics = metrics | ||
|
||
|
@@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { | |
if err != nil { | ||
return nil | ||
} | ||
|
||
inRange, err := s.isBlockInMinMaxRange(ctx, id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem here is that we hit the issue of partial blocks... Something that #1394 touches. I thought we actually handle this in store gw properly (blocks without meta are excluded), seems not so not a regression. In any case we will just warn so 👍 |
||
if err != nil { | ||
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) | ||
return nil | ||
} | ||
|
||
if !inRange { | ||
return nil | ||
} | ||
|
||
allIDs[id] = struct{}{} | ||
|
||
if b := s.getBlock(id); b != nil { | ||
|
@@ -377,6 +398,26 @@ func (s *BucketStore) numBlocks() int { | |
return len(s.blocks) | ||
} | ||
|
||
func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) { | ||
dir := filepath.Join(s.dir, id.String()) | ||
|
||
err, meta := loadMeta(ctx, s.logger, s.bucket, dir, id) | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
// We check for blocks in configured minTime, maxTime range. | ||
switch { | ||
case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): | ||
return false, nil | ||
|
||
case meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp(): | ||
return false, nil | ||
} | ||
|
||
return true, nil | ||
} | ||
|
||
func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { | ||
s.mtx.RLock() | ||
defer s.mtx.RUnlock() | ||
|
@@ -468,6 +509,10 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { | |
maxt = b.meta.MaxTime | ||
} | ||
} | ||
|
||
mint = s.limitMinTime(mint) | ||
maxt = s.limitMaxTime(maxt) | ||
|
||
return mint, maxt | ||
} | ||
|
||
|
@@ -482,6 +527,26 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info | |
}, nil | ||
} | ||
|
||
func (s *BucketStore) limitMinTime(mint int64) int64 { | ||
filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp() | ||
|
||
if mint < filterMinTime { | ||
return filterMinTime | ||
} | ||
|
||
return mint | ||
} | ||
|
||
func (s *BucketStore) limitMaxTime(maxt int64) int64 { | ||
filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp() | ||
|
||
if maxt > filterMaxTime { | ||
maxt = filterMaxTime | ||
} | ||
|
||
return maxt | ||
} | ||
|
||
type seriesEntry struct { | ||
lset []storepb.Label | ||
refs []uint64 | ||
|
@@ -722,6 +787,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie | |
if err != nil { | ||
return status.Error(codes.InvalidArgument, err.Error()) | ||
} | ||
req.MinTime = s.limitMinTime(req.MinTime) | ||
req.MaxTime = s.limitMaxTime(req.MaxTime) | ||
|
||
var ( | ||
stats = &queryStats{} | ||
g run.Group | ||
|
@@ -1091,9 +1159,12 @@ func newBucketBlock( | |
dir: dir, | ||
partitioner: p, | ||
} | ||
if err = b.loadMeta(ctx, id); err != nil { | ||
err, meta := loadMeta(ctx, logger, bkt, dir, id) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "load meta") | ||
} | ||
b.meta = meta | ||
|
||
if err = b.loadIndexCacheFile(ctx); err != nil { | ||
return nil, errors.Wrap(err, "load index cache") | ||
} | ||
|
@@ -1116,26 +1187,26 @@ func (b *bucketBlock) indexCacheFilename() string { | |
return path.Join(b.id.String(), block.IndexCacheFilename) | ||
} | ||
|
||
func (b *bucketBlock) loadMeta(ctx context.Context, id ulid.ULID) error { | ||
func loadMeta(ctx context.Context, logger log.Logger, bucket objstore.BucketReader, dir string, id ulid.ULID) (error, *metadata.Meta) { | ||
// If we haven't seen the block before download the meta.json file. | ||
if _, err := os.Stat(b.dir); os.IsNotExist(err) { | ||
if err := os.MkdirAll(b.dir, 0777); err != nil { | ||
return errors.Wrap(err, "create dir") | ||
if _, err := os.Stat(dir); os.IsNotExist(err) { | ||
if err := os.MkdirAll(dir, 0777); err != nil { | ||
return errors.Wrap(err, "create dir"), nil | ||
} | ||
src := path.Join(id.String(), block.MetaFilename) | ||
|
||
if err := objstore.DownloadFile(ctx, b.logger, b.bucket, src, b.dir); err != nil { | ||
return errors.Wrap(err, "download meta.json") | ||
if err := objstore.DownloadFile(ctx, logger, bucket, src, dir); err != nil { | ||
return errors.Wrap(err, "download meta.json"), nil | ||
} | ||
} else if err != nil { | ||
return err | ||
return err, nil | ||
} | ||
meta, err := metadata.Read(b.dir) | ||
meta, err := metadata.Read(dir) | ||
if err != nil { | ||
return errors.Wrap(err, "read meta.json") | ||
return errors.Wrap(err, "read meta.json"), nil | ||
} | ||
b.meta = meta | ||
return nil | ||
|
||
return nil, meta | ||
} | ||
|
||
func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default("0000-01-01T00:00:00Z"))
Is that lowest min time? What about-9999-12-31T23:59:59Z
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gives:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would set some explicit default value here (for
""
value), but we can do it in next PRs (: