Skip to content

Commit

Permalink
limits implementation for loki
Browse files Browse the repository at this point in the history
Added following new limits:

Length of query
Number of active streams
Number of streams matcher per query
Ingestion rate
  • Loading branch information
sandeepsukhani committed Sep 18, 2019
1 parent 3b96510 commit be3ce52
Show file tree
Hide file tree
Showing 43 changed files with 735 additions and 2,974 deletions.
21 changes: 11 additions & 10 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/util/validation"
)

func init() {
Expand Down
60 changes: 50 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,34 @@ import (
"context"
"flag"
"net/http"
"sync"
"sync/atomic"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/time/rate"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

const metricName = "logs"
const (
metricName = "logs"
bytesInMB = 1048576
)

var readinessProbeSuccess = []byte("Ready")
var (
Expand Down Expand Up @@ -68,6 +75,9 @@ type Distributor struct {
ring ring.ReadRing
overrides *validation.Overrides
pool *cortex_client.Pool

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
}

// New a distributor creates.
Expand All @@ -80,11 +90,12 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *val
}

return &Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
}, nil
}

Expand Down Expand Up @@ -145,6 +156,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
validatedSamplesSize := 0

for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
Expand All @@ -153,13 +166,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
validatedSamplesSize += len(entry.Line)
}

if len(entries) == 0 {
Expand All @@ -176,6 +190,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

limiter := d.getOrCreateIngestLimiter(userID)
if !limiter.AllowN(time.Now(), validatedSamplesSize) {
// Return a 4xx here to have the client discard the data and not retry. If a client
// is sending too much data consistently we will unlikely ever catch up otherwise.
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%d) exceeded while adding %d lines", int(limiter.Limit()), lineCount)
}

replicationSets, err := d.ring.BatchGet(keys, ring.Write)
if err != nil {
return nil, err
Expand Down Expand Up @@ -226,7 +248,7 @@ func (d *Distributor) validateLabels(userID, labels string) error {
}

// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return validation.ValidateLabels(d.overrides, userID, ls)
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -287,3 +309,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
d.ingestLimitersMtx.RLock()
limiter, ok := d.ingestLimiters[userID]
d.ingestLimitersMtx.RUnlock()

if ok {
return limiter
}

limiter = rate.NewLimiter(rate.Limit(int64(d.overrides.IngestionRate(userID)*bytesInMB)), int(d.overrides.IngestionBurstSize(userID)*bytesInMB))

d.ingestLimitersMtx.Lock()
d.ingestLimiters[userID] = limiter
d.ingestLimitersMtx.Unlock()

return limiter
}
55 changes: 49 additions & 6 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,72 @@ package distributor
import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)

const (
numIngesters = 5
ingestionRateLimit = 0.000096 // 100 Bytes/s limit
)

const numIngesters = 5
var (
success = &logproto.PushResponse{}
ctx = user.InjectOrgID(context.Background(), "test")
)

func TestDistributor(t *testing.T) {
for i, tc := range []struct {
samples int
expectedResponse *logproto.PushResponse
expectedError error
}{
{
samples: 10,
expectedResponse: success,
},
{
samples: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100) exceeded while adding 100 lines"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.samples), func(t *testing.T) {
d := prepare(t)

request := makeWriteRequest(tc.samples)
response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
})
}
}

func prepare(t *testing.T) *Distributor {
var (
distributorConfig Config
defaultLimits validation.Limits
clientConfig client.Config
)
flagext.DefaultValues(&distributorConfig, &defaultLimits, &clientConfig)
defaultLimits.EnforceMetricName = false
defaultLimits.IngestionRate = ingestionRateLimit
defaultLimits.IngestionBurstSize = ingestionRateLimit

limits, err := validation.NewOverrides(defaultLimits)
require.NoError(t, err)
Expand All @@ -54,22 +94,25 @@ func TestDistributor(t *testing.T) {
d, err := New(distributorConfig, clientConfig, r, limits)
require.NoError(t, err)

return d
}

func makeWriteRequest(samples int) *logproto.PushRequest {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
},
},
}
for i := 0; i < 10; i++ {

for i := 0; i < samples; i++ {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = d.Push(user.InjectOrgID(context.Background(), "test"), &req)
require.NoError(t, err)
return &req
}

type mockIngester struct {
Expand Down
13 changes: 9 additions & 4 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util/flagext"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,7 +67,10 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
chunks: map[string][]chunk.Chunk{},
}

ing, err := New(cfg, client.Config{}, store)
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits)
require.NoError(t, err)

return store, ing
Expand Down
Loading

0 comments on commit be3ce52

Please sign in to comment.