Skip to content

Commit

Permalink
limits: limits implementation for loki (#948)
Browse files Browse the repository at this point in the history
* limits implementation for loki

Added following new limits:

Length of query
Number of active streams
Number of streams matcher per query
Ingestion rate

* periodically reload ingestion rate limits to apply latest rates from overrides
  • Loading branch information
sandeepsukhani authored Sep 23, 2019
1 parent b30c0d3 commit ec5bc70
Show file tree
Hide file tree
Showing 43 changed files with 775 additions and 2,978 deletions.
21 changes: 11 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/util/validation"
)

func init() {
Expand Down
98 changes: 86 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const metricName = "logs"
const (
metricName = "logs"
bytesInMB = 1048576
)

var readinessProbeSuccess = []byte("Ready")
var (
Expand Down Expand Up @@ -55,10 +62,13 @@ var (
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)

LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period"`
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
}

// Distributor coordinates replicates and distribution of log streams.
Expand All @@ -68,6 +78,10 @@ type Distributor struct {
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
}

// New a distributor creates.
Expand All @@ -79,13 +93,44 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *val
}
}

return &Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
}, nil
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
quit: make(chan struct{}),
}

go d.loop()

return &d, nil
}

func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
}

ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()

case <-d.quit:
return
}
}
}

func (d *Distributor) Stop() {
close(d.quit)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -145,6 +190,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
validatedSamplesSize := 0

for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
Expand All @@ -153,13 +200,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
validatedSamplesSize += len(entry.Line)
}

if len(entries) == 0 {
Expand All @@ -176,6 +224,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}

replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
Expand Down Expand Up @@ -226,7 +282,7 @@ func (d *Distributor) validateLabels(userID, labels string) error {
}

// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return validation.ValidateLabels(d.overrides, userID, ls)
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -287,3 +343,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()

if ok {
return limiter
}

limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))

d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()

return limiter
}
55 changes: 49 additions & 6 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,72 @@ package distributor
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)

const (
numIngesters = 5
ingestionRateLimit = 0.000096 // 100 Bytes/s limit
)

const numIngesters = 5
var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
)

func TestDistributor(t *testing.T) {
for i, tc := range []struct {
samples int
expectedResponse *logproto.PushResponse
expectedError error
}{
{
samples: 10,
expectedResponse: success,
},
{
samples: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100) exceeded while adding 100 lines"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.samples), func(t *testing.T) {
d := prepare(t)

request := makeWriteRequest(tc.samples)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
})
}
}

func prepare(t *testing.T) *Distributor {
var (
distributorConfig Config
defaultLimits validation.Limits
clientConfig client.Config
)
flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig)
defaultLimits.EnforceMetricName = false
defaultLimits.IngestionRate = ingestionRateLimit
defaultLimits.IngestionBurstSize = ingestionRateLimit

limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
Expand All @@ -54,22 +94,25 @@ func TestDistributor(t *testing.T) {
d, err := New(distributorConfig, clientConfig, r, limits)
require.NoError(t, err)

return d
}

func makeWriteRequest(samples int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
},
},
}
for i := 0; i < 10; i++ {

for i := 0; i < samples; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
return &req
}

type mockIngester struct {
Expand Down
Loading

0 comments on commit ec5bc70

Please sign in to comment.