Skip to content

Commit

Permalink
periodically reload ingestion rate limits to apply latest rates from …
Browse files Browse the repository at this point in the history
…overrides
  • Loading branch information
sandeepsukhani committed Sep 18, 2019
1 parent be3ce52 commit eebcef8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 20 deletions.
38 changes: 36 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ var (
type Config struct {
// For testing.
factory func(addr string) (grpc_health_v1.HealthClient, error)

LimiterReloadPeriod time.Duration
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
}

// Distributor coordinates replicates and distribution of log streams.
Expand All @@ -78,6 +81,7 @@ type Distributor struct {

ingestLimitersMtx sync.RWMutex
ingestLimiters map[string]*rate.Limiter
quit chan struct{}
}

// New a distributor creates.
Expand All @@ -89,14 +93,44 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, overrides *val
}
}

return &Distributor{
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
ring: ring,
overrides: overrides,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, factory, cortex_util.Logger),
ingestLimiters: map[string]*rate.Limiter{},
}, nil
quit: make(chan struct{}),
}

go d.loop()

return &d, nil
}

func (d *Distributor) loop() {
if d.cfg.LimiterReloadPeriod == 0 {
return
}

ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.ingestLimitersMtx.Lock()
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
d.ingestLimitersMtx.Unlock()

case <-d.quit:
return
}
}
}

func (d *Distributor) Stop() {
close(d.quit)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down
6 changes: 6 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ func (t *Loki) initDistributor() (err error) {
return
}

func (t *Loki) stopDistributor() (err error) {
t.distributor.Stop()
return nil
}

func (t *Loki) initQuerier() (err error) {
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
Expand Down Expand Up @@ -319,6 +324,7 @@ var modules = map[moduleName]module{
Distributor: {
deps: []moduleName{Ring, Server, Overrides},
init: (*Loki).initDistributor,
stop: (*Loki).stopDistributor,
},

Store: {
Expand Down
13 changes: 3 additions & 10 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/cortexproject/cortex/pkg/util"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -153,7 +151,7 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l

// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) {
err := q.validateQueryRequest(ctx, &req.QueryRequest)
err := q.validateQueryRequest(ctx, params.QueryRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,7 +277,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
},
}

err := q.validateQueryRequest(ctx, &histReq.QueryRequest)
err := q.validateQueryRequest(ctx, histReq.QueryRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -380,16 +378,11 @@ func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryR
return err
}

expr, err := logql.ParseExpr(req.Query)
matchers, err := logql.ParseMatchers(req.Selector)
if err != nil {
return err
}

if req.Regex != "" {
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, req.Regex)
}

matchers := expr.Matchers()
maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(userID)
if len(matchers) > maxStreamMatchersPerQuery {
return httpgrpc.Errorf(http.StatusBadRequest,
Expand Down
13 changes: 7 additions & 6 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/logql"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -248,12 +250,11 @@ func defaultLimitsTestConfig() validation.Limits {

func TestQuerier_validateQueryRequest(t *testing.T) {
request := logproto.QueryRequest{
Query: "{type=\"test\", fail=\"yes\"}",
Selector: "{type=\"test\", fail=\"yes\"}",
Limit: 10,
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Direction: logproto.FORWARD,
Regex: "",
}

store := newStoreMock()
Expand Down Expand Up @@ -282,14 +283,14 @@ func TestQuerier_validateQueryRequest(t *testing.T) {

ctx := user.InjectOrgID(context.Background(), "test")

_, err = q.Query(ctx, &request)
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "max streams matchers per query exceeded, matchers-count > limit (2 > 1)"), err)

request.Query = "{type=\"test\"}"
_, err = q.Query(ctx, &request)
request.Selector = "{type=\"test\"}"
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.NoError(t, err)

request.Start = request.End.Add(-3 * time.Minute)
_, err = q.Query(ctx, &request)
_, err = q.Select(ctx, logql.SelectParams{QueryRequest: &request})
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, length > limit (3m0s > 2m0s)"), err)
}
2 changes: 0 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down

0 comments on commit eebcef8

Please sign in to comment.