Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smooth out spikes in rate of chunk flush ops #3191

Merged
merged 4 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* [ENHANCEMENT] Blocksconvert – Scanner: metrics for tracking progress. #3222
* [ENHANCEMENT] Blocksconvert – Builder: retry block upload before giving up. #3245
* [ENHANCEMENT] Hash ring: added instance registered timestamp to the ring. #3248
* [ENHANCEMENT] Reduce tail latency by smoothing out spikes in rate of chunk flush operations. #3191
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195
* [BUGFIX] Handle hash-collisions in the query path. #3192
Expand Down
25 changes: 25 additions & 0 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
ot "github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"golang.org/x/time/rate"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -19,6 +20,8 @@ const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second
// Lower bound on flushes per check period for rate-limiter
minFlushes = 100
)

// Flush triggers a flush of all the chunks and closes the flush queues.
Expand Down Expand Up @@ -94,6 +97,25 @@ func (i *Ingester) sweepUsers(immediate bool) {
}

i.metrics.oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix()))
i.setFlushRate()
}

// Compute a rate such to spread calls to the store over nearly all of the flush period,
// for example if we have 600 items in the queue and period 1 min we will send 10.5 per second.
// Note if the store can't keep up with this rate then it doesn't make any difference.
func (i *Ingester) setFlushRate() {
totalQueueLength := 0
for _, q := range i.flushQueues {
totalQueueLength += q.Length()
}
const fudge = 1.05 // aim to finish a little bit before the end of the period
flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * fudge
// Avoid going very slowly with tiny queues
if flushesPerSecond*i.cfg.FlushCheckPeriod.Seconds() < minFlushes {
flushesPerSecond = minFlushes / i.cfg.FlushCheckPeriod.Seconds()
}
level.Debug(util.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond)
i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond))
}

type flushReason int8
Expand Down Expand Up @@ -235,6 +257,9 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

if !op.immediate {
_ = i.flushRateLimiter.Wait(context.Background())
}
outcome, err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
i.metrics.seriesDequeuedOutcome.WithLabelValues(outcome.String()).Inc()
if err != nil {
Expand Down
28 changes: 17 additions & 11 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"

cortex_chunk "github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -141,6 +142,9 @@ type Ingester struct {
flushQueues []*util.PriorityQueue
flushQueuesDone sync.WaitGroup

// Spread out calls to the chunk store over the flush period
flushRateLimiter *rate.Limiter

// This should never be nil.
wal WAL
// To be passed to the WAL.
Expand Down Expand Up @@ -196,11 +200,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, true, cfg.ActiveSeriesMetricsEnabled),

limits: limits,
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
usersMetadata: map[string]*userMetricsMetadata{},
registerer: registerer,
limits: limits,
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
usersMetadata: map[string]*userMetricsMetadata{},
registerer: registerer,
}

var err error
Expand Down Expand Up @@ -275,12 +280,13 @@ func NewForFlusher(cfg Config, chunkStore ChunkStore, limits *validation.Overrid
}

i := &Ingester{
cfg: cfg,
metrics: newIngesterMetrics(registerer, true, false),
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
wal: &noopWAL{},
limits: limits,
cfg: cfg,
metrics: newIngesterMetrics(registerer, true, false),
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
flushRateLimiter: rate.NewLimiter(rate.Inf, 1),
wal: &noopWAL{},
limits: limits,
}

i.BasicService = services.NewBasicService(i.startingForFlusher, i.loopForFlusher, i.stopping)
Expand Down