Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rate limit processor] Add counter metric for dropped events #23330

Merged
merged 9 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012]
- Honor kube event resysncs to handle missed watch events {pull}22668[22668]
- Add autodiscover provider and metadata processor for Nomad. {pull}14954[14954] {pull}23324[23324]
- Add optional `metric_field` setting to `rate_limit` processor. {pull}23330[23330]
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
- Add `processors.rate_limit.n.dropped` monitoring counter metric for the `rate_limit` processor. {pull}23330[23330]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/ratelimit/docs/rate_limit.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ beta[]
The `rate_limit` processor limits the throughput of events based on
the specified configuration.

In the current implementation, rate-limited events are dropped. Future
implementations may allow rate-limited events to be handled differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated to this PR but since it's minor I thought I would include it in here. Let me know if you'd prefer to have it in its own PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with adding it here, but maybe only the part of In the current implementation, rate-limited events are dropped., I don't think it is needed to document possible future implementations 🙂

[source,yaml]
-----------------------------------------------------
processors:
Expand Down
30 changes: 27 additions & 3 deletions libbeat/processors/ratelimit/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,40 @@ package ratelimit
import (
"fmt"
"sort"
"strconv"

"github.com/jonboulle/clockwork"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/processors"
)

// instanceID is used to assign each instance a unique monitoring namespace.
var instanceID = atomic.MakeUint32(0)

const processorName = "rate_limit"
const logName = "processor." + processorName

func init() {
processors.RegisterPlugin(processorName, new)
}

const processorName = "rate_limit"
type metrics struct {
Dropped *monitoring.Int
}

type rateLimit struct {
config config
algorithm algorithm
logger *logp.Logger

logger *logp.Logger
metrics metrics
}

// new constructs a new rate limit processor.
Expand All @@ -63,10 +76,20 @@ func new(cfg *common.Config) (processors.Processor, error) {
return nil, errors.Wrap(err, "could not construct rate limiting algorithm")
}

// Logging and metrics (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
log = logp.NewLogger(logName).With("instance_id", id)
reg = monitoring.Default.NewRegistry(logName+"."+strconv.Itoa(id), monitoring.DoNotReport)
)

p := &rateLimit{
config: config,
algorithm: algo,
logger: logp.NewLogger("rate_limit"),
logger: log,
metrics: metrics{
Dropped: monitoring.NewInt(reg, "dropped"),
},
}

p.setClock(clockwork.NewRealClock())
Expand All @@ -87,6 +110,7 @@ func (p *rateLimit) Run(event *beat.Event) (*beat.Event, error) {
}

p.logger.Debugf("event [%v] dropped by rate_limit processor", event)
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
p.metrics.Dropped.Inc()
return nil, nil
}

Expand Down
63 changes: 33 additions & 30 deletions libbeat/processors/ratelimit/rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,23 @@ func TestNew(t *testing.T) {
}

func TestRateLimit(t *testing.T) {
inEvents := []beat.Event{
{
var inEvents []beat.Event
for i := 1; i <= 6; i++ {
event := beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"foo": "bar",
"event_number": i,
},
},
{
Timestamp: time.Now(),
Fields: common.MapStr{
"foo": "bar",
"baz": "mosquito",
},
},
{
Timestamp: time.Now(),
Fields: common.MapStr{
"baz": "qux",
},
},
{
Timestamp: time.Now(),
Fields: common.MapStr{
"foo": "seger",
},
},
}
inEvents = append(inEvents, event)
}

withField := func(in beat.Event, key string, value interface{}) beat.Event {
out := in
out.Fields = in.Fields.Clone()

out.Fields.Put(key, value)
return out
}

cases := map[string]struct {
Expand Down Expand Up @@ -114,9 +105,9 @@ func TestRateLimit(t *testing.T) {
inEvents: inEvents,
outEvents: inEvents[0:2],
},
"rate_5_per_min": {
"rate_6_per_min": {
config: common.MapStr{
"limit": "5/m",
"limit": "6/m",
},
inEvents: inEvents,
outEvents: inEvents,
Expand All @@ -127,16 +118,25 @@ func TestRateLimit(t *testing.T) {
},
delay: 200 * time.Millisecond,
inEvents: inEvents,
outEvents: []beat.Event{inEvents[0], inEvents[1], inEvents[3]},
outEvents: []beat.Event{inEvents[0], inEvents[1], inEvents[3], inEvents[5]},
},
"with_fields": {
config: common.MapStr{
"limit": "1/s",
"fields": []string{"foo"},
},
delay: 400 * time.Millisecond,
inEvents: inEvents,
outEvents: []beat.Event{inEvents[0], inEvents[2], inEvents[3]},
delay: 400 * time.Millisecond,
inEvents: []beat.Event{
withField(inEvents[0], "foo", "bar"),
withField(inEvents[1], "foo", "bar"),
inEvents[2],
withField(inEvents[3], "foo", "seger"),
},
outEvents: []beat.Event{
withField(inEvents[0], "foo", "bar"),
inEvents[2],
withField(inEvents[3], "foo", "seger"),
},
},
"with_burst": {
config: common.MapStr{
Expand All @@ -160,7 +160,10 @@ func TestRateLimit(t *testing.T) {

out := make([]beat.Event, 0)
for _, in := range test.inEvents {
o, err := p.Run(&in)
inCopy := in
inCopy.Fields = in.Fields.Clone()

o, err := p.Run(&inCopy)
require.NoError(t, err)
if o != nil {
out = append(out, *o)
Expand Down