diff --git a/clients/cmd/promtail/promtail-local-limit-config.yaml b/clients/cmd/promtail/promtail-local-limit-config.yaml new file mode 100644 index 0000000000000..935721b4b608a --- /dev/null +++ b/clients/cmd/promtail/promtail-local-limit-config.yaml @@ -0,0 +1,22 @@ +server: + http_listen_port: 9080 + grpc_listen_port: 0 + +positions: + filename: /tmp/positions.yaml + +clients: + - url: http://localhost:3100/loki/api/v1/push + +scrape_configs: +- job_name: system + static_configs: + - targets: + - localhost + labels: + job: varlogs + __path__: /var/log/*log + +limit_config: + readline_rate: 100 + readline_burst: 200 diff --git a/clients/pkg/logentry/stages/pipeline.go b/clients/pkg/logentry/stages/pipeline.go index fa2968aaa6aa9..412d514fb1398 100644 --- a/clients/pkg/logentry/stages/pipeline.go +++ b/clients/pkg/logentry/stages/pipeline.go @@ -1,11 +1,13 @@ package stages import ( + "context" "sync" "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" "github.com/grafana/loki/clients/pkg/promtail/api" ) @@ -16,11 +18,16 @@ type PipelineStages = []interface{} // PipelineStage contains configuration for a single pipeline stage type PipelineStage = map[interface{}]interface{} +var rateLimiter *rate.Limiter +var rateLimiterDrop bool +var rateLimiterDropReason = "global_rate_limiter_drop" + // Pipeline pass down a log entry to each stage for mutation and/or label extraction. type Pipeline struct { - logger log.Logger - stages []Stage - jobName *string + logger log.Logger + stages []Stage + jobName *string + dropCount *prometheus.CounterVec } // NewPipeline creates a new log entry pipeline from a configuration @@ -48,9 +55,10 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist } } return &Pipeline{ - logger: log.With(logger, "component", "pipeline"), - stages: st, - jobName: jobName, + logger: log.With(logger, "component", "pipeline"), + stages: st, + jobName: jobName, + dropCount: getDropCountMetric(registerer), }, nil } @@ -99,6 +107,16 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { go func() { defer wg.Done() for e := range pipelineOut { + if rateLimiter != nil { + if rateLimiterDrop { + if !rateLimiter.Allow() { + p.dropCount.WithLabelValues(rateLimiterDropReason).Inc() + continue + } + } else { + _ = rateLimiter.Wait(context.Background()) + } + } nextChan <- e.Entry } }() @@ -122,3 +140,8 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { func (p *Pipeline) Size() int { return len(p.stages) } + +func SetReadLineRateLimiter(rateVal float64, burstVal int, drop bool) { + rateLimiter = rate.NewLimiter(rate.Limit(rateVal), burstVal) + rateLimiterDrop = drop +} diff --git a/clients/pkg/promtail/config/config.go b/clients/pkg/promtail/config/config.go index 90bd3344c6e39..53426b2eeac2a 100644 --- a/clients/pkg/promtail/config/config.go +++ b/clients/pkg/promtail/config/config.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" + "github.com/grafana/loki/clients/pkg/promtail/limit" yaml "gopkg.in/yaml.v2" "github.com/grafana/loki/clients/pkg/promtail/client" @@ -24,6 +25,7 @@ type Config struct { PositionsConfig positions.Config `yaml:"positions,omitempty"` ScrapeConfig []scrapeconfig.Config `yaml:"scrape_configs,omitempty"` TargetConfig file.Config `yaml:"target_config,omitempty"` + LimitConfig limit.Config `yaml:"limit_config,omitempty"` } // RegisterFlags with prefix registers flags where every name is prefixed by @@ -33,6 +35,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { c.ClientConfig.RegisterFlagsWithPrefix(prefix, f) c.PositionsConfig.RegisterFlagsWithPrefix(prefix, f) c.TargetConfig.RegisterFlagsWithPrefix(prefix, f) + c.LimitConfig.RegisterFlagsWithPrefix(prefix, f) } // RegisterFlags registers flags. diff --git a/clients/pkg/promtail/config/config_test.go b/clients/pkg/promtail/config/config_test.go index 661b7b96ae845..6c02f38747283 100644 --- a/clients/pkg/promtail/config/config_test.go +++ b/clients/pkg/promtail/config/config_test.go @@ -33,6 +33,9 @@ scrape_configs: - localhost labels: job: varlogs +limit_config: + readline_rate: 100 + readline_burst: 200 ` func Test_Load(t *testing.T) { @@ -41,6 +44,15 @@ func Test_Load(t *testing.T) { require.Nil(t, err) } +func Test_RateLimitLoad(t *testing.T) { + var dst Config + err := yaml.Unmarshal([]byte(testFile), &dst) + require.Nil(t, err) + config := dst.LimitConfig + require.Equal(t, float64(100), config.ReadlineRate) + require.Equal(t, 200, config.ReadlineBurst) +} + func TestConfig_Setup(t *testing.T) { for i, tt := range []struct { in Config diff --git a/clients/pkg/promtail/limit/config.go b/clients/pkg/promtail/limit/config.go new file mode 100644 index 0000000000000..6ecef6fdd523e --- /dev/null +++ b/clients/pkg/promtail/limit/config.go @@ -0,0 +1,19 @@ +package limit + +import ( + "flag" +) + +type Config struct { + ReadlineRate float64 `yaml:"readline_rate" json:"readline_rate"` + ReadlineBurst int `yaml:"readline_burst" json:"readline_burst"` + ReadlineRateEnabled bool `yaml:"readline_rate_enabled,omitempty" json:"readline_rate_enabled"` + ReadlineRateDrop bool `yaml:"readline_rate_drop,omitempty" json:"readline_rate_drop"` +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.Float64Var(&cfg.ReadlineRate, prefix+"limit.readline-rate", 10000, "promtail readline Rate.") + f.IntVar(&cfg.ReadlineBurst, prefix+"limit.readline-burst", 10000, "promtail readline Burst.") + f.BoolVar(&cfg.ReadlineRateEnabled, prefix+"limit.readline-rate-enabled", false, "Set to false to disable readline rate limit.") + f.BoolVar(&cfg.ReadlineRateDrop, prefix+"limit.readline-rate-drop", true, "Set to true to drop log when rate limit.") +} diff --git a/clients/pkg/promtail/promtail.go b/clients/pkg/promtail/promtail.go index e058a24d74da1..b8f285af465c5 100644 --- a/clients/pkg/promtail/promtail.go +++ b/clients/pkg/promtail/promtail.go @@ -5,6 +5,7 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log" + "github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/clients/pkg/promtail/client" @@ -57,6 +58,9 @@ func New(cfg config.Config, dryRun bool, opts ...Option) (*Promtail, error) { cfg.Setup() + if cfg.LimitConfig.ReadlineRateEnabled { + stages.SetReadLineRateLimiter(cfg.LimitConfig.ReadlineRate, cfg.LimitConfig.ReadlineBurst, cfg.LimitConfig.ReadlineRateDrop) + } var err error if dryRun { promtail.client, err = client.NewLogger(prometheus.DefaultRegisterer, promtail.logger, cfg.ClientConfigs...)