Skip to content

Commit

Permalink
[new] promtail: add readline rate limit (#5031)
Browse files Browse the repository at this point in the history
* [new] promtail: add readline rate limit

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* [new] promtail: add readline rate limit #5031

* Update clients/pkg/promtail/limit/config.go

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update clients/pkg/promtail/limit/config.go

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
liguozhong and cyriltovena authored Jan 6, 2022
1 parent 9a7f5a2 commit 6b377b4
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 6 deletions.
22 changes: 22 additions & 0 deletions clients/cmd/promtail/promtail-local-limit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*log

limit_config:
readline_rate: 100
readline_burst: 200
35 changes: 29 additions & 6 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package stages

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/grafana/loki/clients/pkg/promtail/api"
)
Expand All @@ -16,11 +18,16 @@ type PipelineStages = []interface{}
// PipelineStage contains configuration for a single pipeline stage
type PipelineStage = map[interface{}]interface{}

var rateLimiter *rate.Limiter
var rateLimiterDrop bool
var rateLimiterDropReason = "global_rate_limiter_drop"

// Pipeline pass down a log entry to each stage for mutation and/or label extraction.
type Pipeline struct {
logger log.Logger
stages []Stage
jobName *string
logger log.Logger
stages []Stage
jobName *string
dropCount *prometheus.CounterVec
}

// NewPipeline creates a new log entry pipeline from a configuration
Expand Down Expand Up @@ -48,9 +55,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist
}
}
return &Pipeline{
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
logger: log.With(logger, "component", "pipeline"),
stages: st,
jobName: jobName,
dropCount: getDropCountMetric(registerer),
}, nil
}

Expand Down Expand Up @@ -99,6 +107,16 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
go func() {
defer wg.Done()
for e := range pipelineOut {
if rateLimiter != nil {
if rateLimiterDrop {
if !rateLimiter.Allow() {
p.dropCount.WithLabelValues(rateLimiterDropReason).Inc()
continue
}
} else {
_ = rateLimiter.Wait(context.Background())
}
}
nextChan <- e.Entry
}
}()
Expand All @@ -122,3 +140,8 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
func (p *Pipeline) Size() int {
return len(p.stages)
}

func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool) {
rateLimiter = rate.NewLimiter(rate.Limit(rateVal), burstVal)
rateLimiterDrop = drop
}
3 changes: 3 additions & 0 deletions clients/pkg/promtail/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"fmt"

"github.com/grafana/loki/clients/pkg/promtail/limit"
yaml "gopkg.in/yaml.v2"

"github.com/grafana/loki/clients/pkg/promtail/client"
Expand All @@ -24,6 +25,7 @@ type Config struct {
PositionsConfig positions.Config `yaml:"positions,omitempty"`
ScrapeConfig []scrapeconfig.Config `yaml:"scrape_configs,omitempty"`
TargetConfig file.Config `yaml:"target_config,omitempty"`
LimitConfig limit.Config `yaml:"limit_config,omitempty"`
}

// RegisterFlags with prefix registers flags where every name is prefixed by
Expand All @@ -33,6 +35,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.ClientConfig.RegisterFlagsWithPrefix(prefix, f)
c.PositionsConfig.RegisterFlagsWithPrefix(prefix, f)
c.TargetConfig.RegisterFlagsWithPrefix(prefix, f)
c.LimitConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlags registers flags.
Expand Down
12 changes: 12 additions & 0 deletions clients/pkg/promtail/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ scrape_configs:
- localhost
labels:
job: varlogs
limit_config:
readline_rate: 100
readline_burst: 200
`

func Test_Load(t *testing.T) {
Expand All @@ -41,6 +44,15 @@ func Test_Load(t *testing.T) {
require.Nil(t, err)
}

func Test_RateLimitLoad(t *testing.T) {
var dst Config
err := yaml.Unmarshal([]byte(testFile), &dst)
require.Nil(t, err)
config := dst.LimitConfig
require.Equal(t, float64(100), config.ReadlineRate)
require.Equal(t, 200, config.ReadlineBurst)
}

func TestConfig_Setup(t *testing.T) {
for i, tt := range []struct {
in Config
Expand Down
19 changes: 19 additions & 0 deletions clients/pkg/promtail/limit/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package limit

import (
"flag"
)

type Config struct {
ReadlineRate float64 `yaml:"readline_rate" json:"readline_rate"`
ReadlineBurst int `yaml:"readline_burst" json:"readline_burst"`
ReadlineRateEnabled bool `yaml:"readline_rate_enabled,omitempty" json:"readline_rate_enabled"`
ReadlineRateDrop bool `yaml:"readline_rate_drop,omitempty" json:"readline_rate_drop"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Float64Var(&cfg.ReadlineRate, prefix+"limit.readline-rate", 10000, "promtail readline Rate.")
f.IntVar(&cfg.ReadlineBurst, prefix+"limit.readline-burst", 10000, "promtail readline Burst.")
f.BoolVar(&cfg.ReadlineRateEnabled, prefix+"limit.readline-rate-enabled", false, "Set to false to disable readline rate limit.")
f.BoolVar(&cfg.ReadlineRateDrop, prefix+"limit.readline-rate-drop", true, "Set to true to drop log when rate limit.")
}
4 changes: 4 additions & 0 deletions clients/pkg/promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/promtail/client"
Expand Down Expand Up @@ -57,6 +58,9 @@ func New(cfg config.Config, dryRun bool, opts ...Option) (*Promtail, error) {

cfg.Setup()

if cfg.LimitConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitConfig.ReadlineRate, cfg.LimitConfig.ReadlineBurst, cfg.LimitConfig.ReadlineRateDrop)
}
var err error
if dryRun {
promtail.client, err = client.NewLogger(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfigs...)
Expand Down

0 comments on commit 6b377b4

Please sign in to comment.