Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingestion rate global limit support #1486

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
## master / unreleased

### Notable Changes
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Deprecated `-distributor.limiter-reload-period` flag / distributor's `limiter_reload_period` config option.

### Features

* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.
* [1486](https://github.com/grafana/loki/pull/1486) **pracucci**: Added `global` ingestion rate limiter strategy support.

# 1.2.0 (2019-12-09)

Expand Down
26 changes: 21 additions & 5 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ The `server_config` block configures Promtail's behavior as an HTTP server:
The `distributor_config` block configures the Loki Distributor.

```yaml
# Period at which to reload user ingestion limits.
[limiter_reload_period: <duration> | default = 5m]
# Configures the distributors ring, used when the "global" ingestion rate
# strategy is enabled.
[ring: <ring_config>]
```

## querier_config
Expand Down Expand Up @@ -716,12 +717,27 @@ The `limits_config` block configures global and per-tenant limits for ingesting
logs in Loki.

```yaml
# Whether the ingestion rate limit should be applied individually to each
# distributor instance (local), or evenly shared across the cluster (global).
# The ingestion rate strategy cannot be overridden on a per-tenant basis.
#
# - local: enforces the limit on a per distributor basis. The actual effective
# rate limit will be N times higher, where N is the number of distributor
# replicas.
# - global: enforces the limit globally, configuring a per-distributor local
# rate limiter as "ingestion_rate / N", where N is the number of distributor
# replicas (it's automatically adjusted if the number of replicas change).
# The global strategy requires the distributors to form their own ring, which
# is used to keep track of the current number of healthy distributor replicas.
[ingestion_rate_strategy: <string> | default = "local"]

# Per-user ingestion rate limit in sample size per second. Units in MB.
[ingestion_rate_mb: <float> | default = 4]

# Per-user allowed ingestion burst size (in sample size). Units in MB. Warning,
# very high limits will be reset every limiter_reload_period defined in
# distributor_config.
# Per-user allowed ingestion burst size (in sample size). Units in MB.
# The burst size refers to the per-distributor local rate limiter even in the
# case of the "global" strategy, and should be set at least to the maximum logs
# size expected in a single push request.
[ingestion_burst_size_mb: <int> | default = 6]

# Maximum length of a label name.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ require (
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/sys v0.0.0-20191218084908-4a24b4065292 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20190925134113-a044388aa56f // indirect
google.golang.org/appengine v1.6.3 // indirect
google.golang.org/genproto v0.0.0-20190916214212-f660b8655731 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/structtag v1.0.0/go.mod h1:IKitwq45uXL/yqi5mYghiD3w9H6eTOvI9vnk8tXMphA=
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c h1:QwbffUs/+ptC4kTFPEN9Ej2latTq3bZJ5HO/OwPXYMs=
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
github.com/fluent/fluent-logger-golang v1.2.1 h1:CMA+mw2zMiOGEOarZtaqM3GBWT1IVLNncNi0nKELtmU=
github.com/fluent/fluent-logger-golang v1.2.1/go.mod h1:2/HCT/jTy78yGyeNGQLGQsjF3zzzAuy6Xlk6FCMV5eU=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
Expand Down Expand Up @@ -563,6 +564,7 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8 h1:jkUFVqrKRttbdDqkTrvOmHxfqIsJK0Oe2WGi1ACAE+M=
github.com/philhofer/fwd v0.0.0-20160129035939-98c11a7a6ec8/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.3.1-0.20191115212037-9085dacd1e1e+incompatible h1:5isCJDRADbeSlWx6KVXAYwrcihyCGVXr7GNCdLEVDr8=
Expand Down Expand Up @@ -666,6 +668,7 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/thanos-io/thanos v0.8.1/go.mod h1:qQDi/6tgypn96+VzSumlxfJIgFX2y3ablfhHHLZ05cg=
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d h1:Ninez2SUm08xpmnw7kVxCeOc3DahF6IuMuRMCdM4wTQ=
github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
Expand All @@ -687,6 +690,7 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 h1:qi+YkNiB7T3Ikw1DoDIFhdAPbDU7fUPDsKrUoZdupnQ=
github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1/go.mod h1:7gGdEUJaCrSlWi/mjd68CZv0sfqektYPDcro9cE+M9k=
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4 h1:O8BmyjqQoByXjAj6XaTfcxxqSIK6DYLmOSiYQPL9yJg=
github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4/go.mod h1:pSm+0KR57BG3pvGoJWFXJSAC7+sEPewcvdt5StevL3A=
Expand Down
120 changes: 51 additions & 69 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
Expand All @@ -30,7 +30,6 @@ import (

const (
metricName = "logs"
bytesInMB = 1048576
)

var readinessProbeSuccess = []byte("Ready")
Expand Down Expand Up @@ -60,77 +59,78 @@ var (

// Config for a Distributor.
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)
// Distributors ring
DistributorRing cortex_distributor.RingConfig `yaml:"ring,omitempty"`

LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period"`
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error) `yaml:"-"`
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
cfg.DistributorRing.RegisterFlags(f)
}

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
cfg Config
clientCfg client.Config
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

// The global rate limiter requires a distributors ring to count
// the number of healthy instances.
distributorsRing *ring.Lifecycler

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
}

// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
}

d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
}
// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler

go d.loop()
if overrides.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
var err error
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey)
if err != nil {
return nil, err
}

return &d, nil
}
distributorsRing.Start()

func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
} else {
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
}

ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()

case <-d.quit:
return
}
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

return &d, nil
}

func (d *Distributor) Stop() {
close(d.quit)
if d.distributorsRing != nil {
d.distributorsRing.Shutdown()
}
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand All @@ -153,7 +153,7 @@ type pushTracker struct {
// ReadinessHandler is used to indicate to k8s when the distributor is ready.
// Returns 200 when the distributor is ready, 500 otherwise.
func (d *Distributor) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
_, err := d.ring.GetAll()
_, err := d.ingestersRing.GetAll()
if err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -226,13 +226,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), validatedSamplesCount)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d bytes) exceeded while adding %d lines for a total size of %d bytes", int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize)
}

const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck
Expand All @@ -241,7 +241,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
samplesByIngester := map[string][]*streamTracker{}
ingesterDescs := map[string]ring.IngesterDesc{}
for i, key := range keys {
replicationSet, err := d.ring.Get(key, ring.Write, descs[:0])
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -349,21 +349,3 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()

if ok {
return limiter
}

limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))

d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()

return limiter
}
Loading