-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Store: Add Time & duration based partitioning #1408
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
opentracing "github.com/opentracing/opentracing-go" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/objstore/client" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
"github.com/thanos-io/thanos/pkg/store" | ||
|
@@ -49,7 +50,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string | |
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). | ||
Default("20").Int() | ||
|
||
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
Default("0000-01-01T00:00:00Z")) | ||
|
||
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We definitely need the same logic for Prometheus sidecar. My point is that we should avoid fetching duplicated data from different stores when it's possible to decrease merging time and memory usage.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that makes sense to me. |
||
Default("9999-12-31T23:59:59Z")) | ||
|
||
m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { | ||
// Sanity check Time filters | ||
povilasv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { | ||
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", | ||
minTime, maxTime) | ||
} | ||
|
||
return runStore(g, | ||
logger, | ||
reg, | ||
|
@@ -69,6 +82,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string | |
debugLogging, | ||
*syncInterval, | ||
*blockSyncConcurrency, | ||
&store.FilterConfig{ | ||
MinTime: *minTime, | ||
MaxTime: *maxTime, | ||
}, | ||
) | ||
} | ||
} | ||
|
@@ -94,6 +111,7 @@ func runStore( | |
verbose bool, | ||
syncInterval time.Duration, | ||
blockSyncConcurrency int, | ||
filterConf *store.FilterConfig, | ||
) error { | ||
{ | ||
confContentYaml, err := objStoreConfig.Content() | ||
|
@@ -135,6 +153,7 @@ func runStore( | |
maxConcurrent, | ||
verbose, | ||
blockSyncConcurrency, | ||
filterConf, | ||
) | ||
if err != nil { | ||
return errors.Wrap(err, "create object storage store") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package model | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/pkg/timestamp" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
// TimeOrDurationValue is a custom kingping parser for time in RFC3339 | ||
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m". | ||
// Only one will be set. | ||
type TimeOrDurationValue struct { | ||
Time *time.Time | ||
Dur *model.Duration | ||
} | ||
|
||
// Set converts string to TimeOrDurationValue. | ||
func (tdv *TimeOrDurationValue) Set(s string) error { | ||
t, err := time.Parse(time.RFC3339, s) | ||
if err == nil { | ||
tdv.Time = &t | ||
return nil | ||
} | ||
|
||
// error parsing time, let's try duration. | ||
var minus bool | ||
if s[0] == '-' { | ||
minus = true | ||
s = s[1:] | ||
} | ||
dur, err := model.ParseDuration(s) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if minus { | ||
dur = dur * -1 | ||
} | ||
tdv.Dur = &dur | ||
return nil | ||
} | ||
|
||
// String returns either tume or duration. | ||
func (tdv *TimeOrDurationValue) String() string { | ||
switch { | ||
case tdv.Time != nil: | ||
return tdv.Time.String() | ||
case tdv.Dur != nil: | ||
return tdv.Dur.String() | ||
} | ||
|
||
return "nil" | ||
} | ||
|
||
// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp | ||
// if duration is set now+duration is converted to Timestamp. | ||
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 { | ||
switch { | ||
case tdv.Time != nil: | ||
return timestamp.FromTime(*tdv.Time) | ||
case tdv.Dur != nil: | ||
return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur))) | ||
} | ||
|
||
return 0 | ||
} | ||
|
||
// TimeOrDuration helper for parsing TimeOrDuration with kingpin. | ||
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue { | ||
value := new(TimeOrDurationValue) | ||
flags.SetValue(value) | ||
return value | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package model_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/pkg/timestamp" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/testutil" | ||
"gopkg.in/alecthomas/kingpin.v2" | ||
) | ||
|
||
func TestTimeOrDurationValue(t *testing.T) { | ||
cmd := kingpin.New("test", "test") | ||
|
||
minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve")) | ||
|
||
maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve"). | ||
Default("9999-12-31T23:59:59Z")) | ||
|
||
_, err := cmd.Parse([]string{"--min-time", "10s"}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
testutil.Equals(t, "10s", minTime.String()) | ||
testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String()) | ||
|
||
prevTime := timestamp.FromTime(time.Now()) | ||
afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second)) | ||
|
||
testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.") | ||
testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s") | ||
|
||
testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ import ( | |
"github.com/thanos-io/thanos/pkg/compact/downsample" | ||
"github.com/thanos-io/thanos/pkg/component" | ||
"github.com/thanos-io/thanos/pkg/extprom" | ||
"github.com/thanos-io/thanos/pkg/model" | ||
"github.com/thanos-io/thanos/pkg/objstore" | ||
"github.com/thanos-io/thanos/pkg/pool" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
|
@@ -182,6 +183,11 @@ type indexCache interface { | |
Series(b ulid.ULID, id uint64) ([]byte, bool) | ||
} | ||
|
||
// FilterConfig is a configuration, which Store uses for filtering metrics. | ||
type FilterConfig struct { | ||
MinTime, MaxTime model.TimeOrDurationValue | ||
} | ||
|
||
// BucketStore implements the store API backed by a bucket. It loads all index | ||
// files to local disk. | ||
type BucketStore struct { | ||
|
@@ -208,6 +214,8 @@ type BucketStore struct { | |
// samplesLimiter limits the number of samples per each Series() call. | ||
samplesLimiter *Limiter | ||
partitioner partitioner | ||
|
||
filterConfig *FilterConfig | ||
} | ||
|
||
// NewBucketStore creates a new bucket backed store that implements the store API against | ||
|
@@ -223,6 +231,7 @@ func NewBucketStore( | |
maxConcurrent int, | ||
debugLogging bool, | ||
blockSyncConcurrency int, | ||
filterConf *FilterConfig, | ||
) (*BucketStore, error) { | ||
if logger == nil { | ||
logger = log.NewNopLogger() | ||
|
@@ -256,6 +265,7 @@ func NewBucketStore( | |
), | ||
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), | ||
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, | ||
filterConfig: filterConf, | ||
} | ||
s.metrics = metrics | ||
|
||
|
@@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { | |
if err != nil { | ||
return nil | ||
} | ||
|
||
inRange, err := s.isBlockInMinMaxRange(ctx, id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem here is that we hit the issue of partial blocks... Something that #1394 touches. I thought we actually handle this in store gw properly (blocks without meta are excluded), seems not so not a regression. In any case we will just warn so 👍 |
||
if err != nil { | ||
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) | ||
return nil | ||
} | ||
|
||
if !inRange { | ||
return nil | ||
} | ||
|
||
allIDs[id] = struct{}{} | ||
|
||
if b := s.getBlock(id); b != nil { | ||
|
@@ -377,6 +398,31 @@ func (s *BucketStore) numBlocks() int { | |
return len(s.blocks) | ||
} | ||
|
||
func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) { | ||
dir := filepath.Join(s.dir, id.String()) | ||
|
||
b := &bucketBlock{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Allocating the whole block for quick check is not great. I think we should just extract Also at some point we need some meta syncer/ matcher that deals with all of this, something similar to #1394 ): Also even with time slicing looks like we will download all meta.json files to disk. Is that what we want? Essentially time slices will not include meta.json files on disk. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Function extracted. Good question, IMO those meta.json files are small in size, so why not? It will speed up our next meta.json lookup. |
||
logger: s.logger, | ||
bucket: s.bucket, | ||
id: id, | ||
dir: dir, | ||
} | ||
if err := b.loadMeta(ctx, id); err != nil { | ||
return false, err | ||
} | ||
|
||
// We check for blocks in configured minTime, maxTime range | ||
povilasv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
switch { | ||
case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): | ||
return false, nil | ||
|
||
case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp(): | ||
return false, nil | ||
} | ||
|
||
return true, nil | ||
} | ||
|
||
func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { | ||
s.mtx.RLock() | ||
defer s.mtx.RUnlock() | ||
|
@@ -468,6 +514,10 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { | |
maxt = b.meta.MaxTime | ||
} | ||
} | ||
|
||
mint = s.normalizeMinTime(mint) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that it would be better to put "normalization" code right here, it would be more easy to read There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but we do it in 2 places.. I think we just need to rename to something clear. |
||
maxt = s.normalizeMaxTime(maxt) | ||
|
||
return mint, maxt | ||
} | ||
|
||
|
@@ -482,6 +532,26 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info | |
}, nil | ||
} | ||
|
||
func (s *BucketStore) normalizeMinTime(mint int64) int64 { | ||
povilasv marked this conversation as resolved.
Show resolved
Hide resolved
|
||
filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp() | ||
|
||
if mint < filterMinTime { | ||
return filterMinTime | ||
} | ||
|
||
return mint | ||
} | ||
|
||
func (s *BucketStore) normalizeMaxTime(maxt int64) int64 { | ||
filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp() | ||
|
||
if maxt > filterMaxTime { | ||
maxt = filterMaxTime | ||
} | ||
|
||
return maxt | ||
} | ||
|
||
type seriesEntry struct { | ||
lset []storepb.Label | ||
refs []uint64 | ||
|
@@ -722,6 +792,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie | |
if err != nil { | ||
return status.Error(codes.InvalidArgument, err.Error()) | ||
} | ||
// Adjust Request MinTime based on filters. | ||
req.MinTime = s.normalizeMinTime(req.MinTime) | ||
req.MaxTime = s.normalizeMaxTime(req.MaxTime) | ||
|
||
var ( | ||
stats = &queryStats{} | ||
g run.Group | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default("0000-01-01T00:00:00Z"))
Is that lowest min time? What about-9999-12-31T23:59:59Z
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gives:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would set some explicit default value here (for
""
value), but we can do it in next PRs (: