Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an experimental flag to block samples with timestamp too far in the future #6195

Merged
merged 12 commits into from
Apr 10, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#6185](https://github.com/thanos-io/thanos/pull/6185) Tracing: tracing in OTLP support configuring service_name.
- [#6192](https://github.com/thanos-io/thanos/pull/6192) Store: add flag `bucket-web-label` to select the label to use as timeline title in web UI
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.

### Fixed

Expand Down
11 changes: 10 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ func runReceive(
conf.allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, conf.writerInterning)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{
Intern: conf.writerInterning,
TooFarInFutureTimeWindow: int64(time.Duration(*conf.tsdbTooFarInFutureTimeWindow)),
})

var limitsConfig *receive.RootLimitsConfig
if conf.writeLimitsConfig != nil {
Expand Down Expand Up @@ -774,6 +777,7 @@ type receiveConfig struct {

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
tsdbTooFarInFutureTimeWindow *model.Duration
tsdbOutOfOrderTimeWindow *model.Duration
tsdbOutOfOrderCapMax int64
tsdbAllowOverlappingBlocks bool
Expand Down Expand Up @@ -866,6 +870,11 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())

rc.tsdbTooFarInFutureTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.too-far-in-future.time-window",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print an error and exit if the flag < 0.

Copy link
Contributor Author

@jnyi jnyi Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of the flag should be a duration string like "0s", "5m", "1h", etc

fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
"[EXPERIMENTAL] Configures the allowed time window for ingesting samples too far in the future. Disabled (0s) by default"+
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration.",
).Default("0s").Hidden())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a limit range we should allow it to put?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for making this flag hidden?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no particular reason, i thought it is a convention for experimental flag, removed.


rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
"Please note if you enable this option and you use compactor, make sure you have the --enable-vertical-compaction flag enabled, otherwise you might risk compactor halt.",
Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
wOpts = &WriterOptions{}
)
// create a fake peer group where we manually fill the cache with fake addresses pointed to our handlers
// This removes the network from the tests and creates a more consistent testing harness.
Expand All @@ -187,7 +188,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
ReplicaHeader: DefaultReplicaHeader,
ReplicationFactor: replicationFactor,
ForwardTimeout: 5 * time.Minute,
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), false),
Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i]), wOpts),
Limiter: limiter,
})
handlers = append(handlers, h)
Expand Down Expand Up @@ -948,7 +949,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
metadata.NoneFunc,
)
defer func() { testutil.Ok(b, m.Close()) }()
handler.writer = NewWriter(logger, m, false)
handler.writer = NewWriter(logger, m, &WriterOptions{})

testutil.Ok(b, m.Flush())
testutil.Ok(b, m.Open())
Expand Down
27 changes: 21 additions & 6 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package receive

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand All @@ -26,17 +28,25 @@ type TenantStorage interface {
TenantAppendable(string) (Appendable, error)
}

type WriterOptions struct {
Intern bool
TooFarInFutureTimeWindow int64 // Unit: nanoseconds
}

type Writer struct {
logger log.Logger
multiTSDB TenantStorage
intern bool
opts *WriterOptions
}

func NewWriter(logger log.Logger, multiTSDB TenantStorage, intern bool) *Writer {
func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) *Writer {
if opts == nil {
opts = &WriterOptions{}
}
return &Writer{
logger: logger,
multiTSDB: multiTSDB,
intern: intern,
opts: opts,
}
}

Expand Down Expand Up @@ -71,7 +81,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
return errors.Wrap(err, "get appender")
}
getRef := app.(storage.GetRef)

tooFarInFuture := model.Now().Add(time.Duration(r.opts.TooFarInFutureTimeWindow))
var (
ref storage.SeriesRef
errs writeErrors
Expand Down Expand Up @@ -105,13 +115,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
if ref == 0 {
// If not, copy labels, as TSDB will hold those strings long term. Given no
// copy unmarshal we don't want to keep memory for whole protobuf, only for labels.
labelpb.ReAllocZLabelsStrings(&t.Labels, r.intern)
labelpb.ReAllocZLabelsStrings(&t.Labels, r.opts.Intern)
lset = labelpb.ZLabelsToPromLabels(t.Labels)
}

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
if tooFarInFuture != 0 && tooFarInFuture.Before(model.Time(s.Timestamp)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check r.opts.TooFarInFutureTimeWindow != 0?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

// now + tooFarInFutureTimeWindow < sample timestamp
err = storage.ErrOutOfBounds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add debug log for how long it out of bounds?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we even make it Warning level? That is something that should fail a bit louder i think so errors can be investigated better.

} else {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
}
switch err {
case storage.ErrOutOfOrderSample:
numSamplesOutOfOrder++
Expand Down
44 changes: 41 additions & 3 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -27,12 +28,14 @@ import (
)

func TestWriter(t *testing.T) {
now := model.Now()
lbls := []labelpb.ZLabel{{Name: "__name__", Value: "test"}}
tests := map[string]struct {
reqs []*prompb.WriteRequest
expectedErr error
expectedIngested []prompb.TimeSeries
maxExemplars int64
opts *WriterOptions
}{
"should error out on series with no labels": {
reqs: []*prompb.WriteRequest{
Expand Down Expand Up @@ -122,6 +125,41 @@ func TestWriter(t *testing.T) {
},
},
},
"should succeed when sample timestamp is NOT too far in the future": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: lbls,
Samples: []prompb.Sample{{Value: 1, Timestamp: int64(now)}},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: lbls,
Samples: []prompb.Sample{{Value: 1, Timestamp: int64(now)}},
},
},
opts: &WriterOptions{TooFarInFutureTimeWindow: 30 * int64(time.Second)},
},
"should error out when sample timestamp is too far in the future": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: lbls,
// A sample with a very large timestamp in year 5138 (milliseconds)
Samples: []prompb.Sample{{Value: 1, Timestamp: 99999999999999}},
},
},
},
},
expectedErr: errors.Wrapf(storage.ErrOutOfBounds, "add 1 samples"),
opts: &WriterOptions{TooFarInFutureTimeWindow: 10000},
},
"should succeed on valid series with exemplars": {
reqs: []*prompb.WriteRequest{{
Timeseries: []prompb.TimeSeries{
Expand Down Expand Up @@ -299,7 +337,7 @@ func TestWriter(t *testing.T) {
return err
}))

w := NewWriter(logger, m, false)
w := NewWriter(logger, m, testData.opts)

for idx, req := range testData.reqs {
err = w.Write(context.Background(), DefaultTenant, req)
Expand Down Expand Up @@ -398,7 +436,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
}

b.Run("without interning", func(b *testing.B) {
w := NewWriter(logger, m, false)
w := NewWriter(logger, m, &WriterOptions{Intern: false})

b.ReportAllocs()
b.ResetTimer()
Expand All @@ -409,7 +447,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr
})

b.Run("with interning", func(b *testing.B) {
w := NewWriter(logger, m, true)
w := NewWriter(logger, m, &WriterOptions{Intern: true})

b.ReportAllocs()
b.ResetTimer()
Expand Down