diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index fbc571e6d14ce..e6e22f72f097e 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -11,13 +11,13 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/net/context" - - "github.com/grafana/dskit/tenant" + "golang.org/x/time/rate" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -30,6 +30,9 @@ const ( // position, not wallclock time. flushBackoff = 1 * time.Second + // Lower bound on flushes per check period for rate-limiter + minFlushes = 100 + nameLabel = "__name__" logsValue = "logs" @@ -99,13 +102,14 @@ func (o *flushOp) Priority() int64 { return -int64(o.from) } -// sweepUsers periodically schedules series for flushing and garbage collects users with no series +// sweepUsers periodically schedules series for flushing and garbage collects users with no streams func (i *Ingester) sweepUsers(immediate, mayRemoveStreams bool) { instances := i.getInstances() for _, instance := range instances { i.sweepInstance(instance, immediate, mayRemoveStreams) } + i.setFlushRate() } func (i *Ingester) sweepInstance(instance *instance, immediate, mayRemoveStreams bool) { @@ -137,6 +141,24 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo }) } +// Compute a rate such to spread calls to the store over nearly all of the flush period, +// for example if we have 600 items in the queue and period 1 min we will send 10.5 per second. +// Note if the store can't keep up with this rate then it doesn't make any difference. +func (i *Ingester) setFlushRate() { + totalQueueLength := 0 + for _, q := range i.flushQueues { + totalQueueLength += q.Length() + } + const jitter = 1.05 // aim to finish a little bit before the end of the period + flushesPerSecond := float64(totalQueueLength) / i.cfg.FlushCheckPeriod.Seconds() * jitter + // Avoid going very slowly with tiny queues + if flushesPerSecond*i.cfg.FlushCheckPeriod.Seconds() < minFlushes { + flushesPerSecond = minFlushes / i.cfg.FlushCheckPeriod.Seconds() + } + level.Debug(util_log.Logger).Log("msg", "computed flush rate", "rate", flushesPerSecond) + i.flushRateLimiter.SetLimit(rate.Limit(flushesPerSecond)) +} + func (i *Ingester) flushLoop(j int) { l := log.With(i.logger, "loop", j) defer func() { @@ -151,8 +173,13 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) + if !op.immediate { + _ = i.flushRateLimiter.Wait(context.Background()) + } + m := util_log.WithUserID(op.userID, l) err := i.flushOp(m, op) + if err != nil { level.Error(m).Log("msg", "failed to flush", "err", err) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dcfeb20b48a5a..af7f1fde288c9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "golang.org/x/time/rate" "google.golang.org/grpc/health/grpc_health_v1" server_util "github.com/grafana/loki/v3/pkg/util/server" @@ -239,6 +240,9 @@ type Ingester struct { flushQueues []*util.PriorityQueue flushQueuesDone sync.WaitGroup + // Spread out calls to the chunk store over the flush period + flushRateLimiter *rate.Limiter + limiter *Limiter // Denotes whether the ingester should flush on shutdown. @@ -294,6 +298,7 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con periodicConfigs: store.GetSchemaConfigs(), loopQuit: make(chan struct{}), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + flushRateLimiter: rate.NewLimiter(rate.Inf, 1), tailersQuit: make(chan struct{}), metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{},