diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bd4027159ee..5864208dc6c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -571,6 +571,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012] - Honor kube event resysncs to handle missed watch events {pull}22668[22668] - Add autodiscover provider and metadata processor for Nomad. {pull}14954[14954] {pull}23324[23324] +- Add `processors.rate_limit.n.dropped` monitoring counter metric for the `rate_limit` processor. {pull}23330[23330] *Auditbeat* @@ -992,4 +993,3 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - diff --git a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc index f629874ad3b..9a7a2f32322 100644 --- a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc +++ b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc @@ -9,6 +9,9 @@ beta[] The `rate_limit` processor limits the throughput of events based on the specified configuration. +In the current implementation, rate-limited events are dropped. Future +implementations may allow rate-limited events to be handled differently. + [source,yaml] ----------------------------------------------------- processors: diff --git a/libbeat/processors/ratelimit/rate_limit.go b/libbeat/processors/ratelimit/rate_limit.go index 210ac5a5912..b1cd10e31fe 100644 --- a/libbeat/processors/ratelimit/rate_limit.go +++ b/libbeat/processors/ratelimit/rate_limit.go @@ -20,6 +20,7 @@ package ratelimit import ( "fmt" "sort" + "strconv" "github.com/jonboulle/clockwork" "github.com/mitchellh/hashstructure" @@ -27,20 +28,32 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/processors" ) +// instanceID is used to assign each instance a unique monitoring namespace. +var instanceID = atomic.MakeUint32(0) + +const processorName = "rate_limit" +const logName = "processor." + processorName + func init() { processors.RegisterPlugin(processorName, new) } -const processorName = "rate_limit" +type metrics struct { + Dropped *monitoring.Int +} type rateLimit struct { config config algorithm algorithm - logger *logp.Logger + + logger *logp.Logger + metrics metrics } // new constructs a new rate limit processor. @@ -63,10 +76,20 @@ func new(cfg *common.Config) (processors.Processor, error) { return nil, errors.Wrap(err, "could not construct rate limiting algorithm") } + // Logging and metrics (each processor instance has a unique ID). + var ( + id = int(instanceID.Inc()) + log = logp.NewLogger(logName).With("instance_id", id) + reg = monitoring.Default.NewRegistry(logName+"."+strconv.Itoa(id), monitoring.DoNotReport) + ) + p := &rateLimit{ config: config, algorithm: algo, - logger: logp.NewLogger("rate_limit"), + logger: log, + metrics: metrics{ + Dropped: monitoring.NewInt(reg, "dropped"), + }, } p.setClock(clockwork.NewRealClock()) @@ -87,6 +110,7 @@ func (p *rateLimit) Run(event *beat.Event) (*beat.Event, error) { } p.logger.Debugf("event [%v] dropped by rate_limit processor", event) + p.metrics.Dropped.Inc() return nil, nil } diff --git a/libbeat/processors/ratelimit/rate_limit_test.go b/libbeat/processors/ratelimit/rate_limit_test.go index 99941ebfd0f..17b70aa9477 100644 --- a/libbeat/processors/ratelimit/rate_limit_test.go +++ b/libbeat/processors/ratelimit/rate_limit_test.go @@ -61,32 +61,23 @@ func TestNew(t *testing.T) { } func TestRateLimit(t *testing.T) { - inEvents := []beat.Event{ - { + var inEvents []beat.Event + for i := 1; i <= 6; i++ { + event := beat.Event{ Timestamp: time.Now(), Fields: common.MapStr{ - "foo": "bar", + "event_number": i, }, - }, - { - Timestamp: time.Now(), - Fields: common.MapStr{ - "foo": "bar", - "baz": "mosquito", - }, - }, - { - Timestamp: time.Now(), - Fields: common.MapStr{ - "baz": "qux", - }, - }, - { - Timestamp: time.Now(), - Fields: common.MapStr{ - "foo": "seger", - }, - }, + } + inEvents = append(inEvents, event) + } + + withField := func(in beat.Event, key string, value interface{}) beat.Event { + out := in + out.Fields = in.Fields.Clone() + + out.Fields.Put(key, value) + return out } cases := map[string]struct { @@ -114,9 +105,9 @@ func TestRateLimit(t *testing.T) { inEvents: inEvents, outEvents: inEvents[0:2], }, - "rate_5_per_min": { + "rate_6_per_min": { config: common.MapStr{ - "limit": "5/m", + "limit": "6/m", }, inEvents: inEvents, outEvents: inEvents, @@ -127,16 +118,25 @@ func TestRateLimit(t *testing.T) { }, delay: 200 * time.Millisecond, inEvents: inEvents, - outEvents: []beat.Event{inEvents[0], inEvents[1], inEvents[3]}, + outEvents: []beat.Event{inEvents[0], inEvents[1], inEvents[3], inEvents[5]}, }, "with_fields": { config: common.MapStr{ "limit": "1/s", "fields": []string{"foo"}, }, - delay: 400 * time.Millisecond, - inEvents: inEvents, - outEvents: []beat.Event{inEvents[0], inEvents[2], inEvents[3]}, + delay: 400 * time.Millisecond, + inEvents: []beat.Event{ + withField(inEvents[0], "foo", "bar"), + withField(inEvents[1], "foo", "bar"), + inEvents[2], + withField(inEvents[3], "foo", "seger"), + }, + outEvents: []beat.Event{ + withField(inEvents[0], "foo", "bar"), + inEvents[2], + withField(inEvents[3], "foo", "seger"), + }, }, "with_burst": { config: common.MapStr{ @@ -160,7 +160,10 @@ func TestRateLimit(t *testing.T) { out := make([]beat.Event, 0) for _, in := range test.inEvents { - o, err := p.Run(&in) + inCopy := in + inCopy.Fields = in.Fields.Clone() + + o, err := p.Run(&inCopy) require.NoError(t, err) if o != nil { out = append(out, *o)