Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Allow multiple filters to be specified
Browse files Browse the repository at this point in the history
  • Loading branch information
p-lambert committed Aug 17, 2017
1 parent 09ac151 commit 32d7386
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 40 deletions.
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ end
PACKAGES = %w(
./agent
./config
./filters
./fixtures
./model
./quantile
Expand Down
16 changes: 11 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/cihub/seelog"

"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/filters"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/quantizer"
"github.com/DataDog/datadog-trace-agent/sampler"
Expand Down Expand Up @@ -35,7 +36,7 @@ func (pt *processedTrace) weight() float64 {
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filter Filter
Filters []filters.Filter
Sampler *Sampler
Writer *Writer

Expand All @@ -57,7 +58,7 @@ func NewAgent(conf *config.AgentConfig) *Agent {
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
)
f := NewResourceFilter(conf)
f := filters.Setup(conf)
s := NewSampler(conf)

w := NewWriter(conf)
Expand All @@ -66,7 +67,7 @@ func NewAgent(conf *config.AgentConfig) *Agent {
return &Agent{
Receiver: r,
Concentrator: c,
Filter: f,
Filters: f,
Sampler: s,
Writer: w,
conf: conf,
Expand Down Expand Up @@ -153,11 +154,16 @@ func (a *Agent) Process(t model.Trace) {
return
}

if !a.Filter.Keep(root) {
log.Debugf("dropping trace with blacklisted resource: %v", *root)
for _, f := range a.Filters {
if f.Keep(root) {
continue
}

log.Debugf("rejecting trace by filter: %T %v", f, *root)
ts := a.Receiver.stats.getTagStats(Tags{})
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))

return
}

Expand Down
4 changes: 2 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func BenchmarkAgentTraceProcessing(b *testing.B) {
func BenchmarkAgentTraceProcessingWithFiltering(b *testing.B) {
c := config.NewDefaultAgentConfig()
c.APIKey = "test"
c.ResourceBlacklist = []string{"[0-9]{3}", "foobar", "G.T [a-z]+", "[^123]+_baz"}
c.Blacklist["resource"] = []string{"[0-9]{3}", "foobar", "G.T [a-z]+", "[^123]+_baz"}

runTraceProcessingBenchmark(b, c)
}
Expand All @@ -93,7 +93,7 @@ func BenchmarkAgentTraceProcessingWithFiltering(b *testing.B) {
func BenchmarkAgentTraceProcessingWithWorstCaseFiltering(b *testing.B) {
c := config.NewDefaultAgentConfig()
c.APIKey = "test"
c.ResourceBlacklist = []string{"[0-9]{3}", "foobar", "aaaaa?aaaa", "[^123]+_baz"}
c.Blacklist["resource"] = []string{"[0-9]{3}", "foobar", "aaaaa?aaaa", "[^123]+_baz"}

runTraceProcessingBenchmark(b, c)
}
Expand Down
16 changes: 13 additions & 3 deletions agent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,22 @@ func (ts *tagStats) publish() {
// Atomically load the stats from ts
tracesReceived := atomic.LoadInt64(&ts.TracesReceived)
tracesDropped := atomic.LoadInt64(&ts.TracesDropped)
tracesFiltered := atomic.LoadInt64(&ts.TracesFiltered)
tracesBytes := atomic.LoadInt64(&ts.TracesBytes)
spansReceived := atomic.LoadInt64(&ts.SpansReceived)
spansDropped := atomic.LoadInt64(&ts.SpansDropped)
spansFiltered := atomic.LoadInt64(&ts.SpansFiltered)
servicesReceived := atomic.LoadInt64(&ts.ServicesReceived)
servicesBytes := atomic.LoadInt64(&ts.ServicesBytes)

// Publish the stats
statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, ts.Tags.toArray(), 1)
}
Expand All @@ -117,7 +121,7 @@ type Stats struct {
SpansReceived int64
// SpansDropped is the number of spans dropped.
SpansDropped int64
// SpansDropped is the number of spans filtered.
// SpansFiltered is the number of spans filtered.
SpansFiltered int64
// ServicesReceived is the number of services received.
ServicesReceived int64
Expand All @@ -128,19 +132,23 @@ type Stats struct {
func (s *Stats) update(recent Stats) {
atomic.AddInt64(&s.TracesReceived, recent.TracesReceived)
atomic.AddInt64(&s.TracesDropped, recent.TracesDropped)
atomic.AddInt64(&s.TracesFiltered, recent.TracesFiltered)
atomic.AddInt64(&s.TracesBytes, recent.TracesBytes)
atomic.AddInt64(&s.SpansReceived, recent.SpansReceived)
atomic.AddInt64(&s.SpansDropped, recent.SpansDropped)
atomic.AddInt64(&s.SpansFiltered, recent.SpansFiltered)
atomic.AddInt64(&s.ServicesReceived, recent.ServicesReceived)
atomic.AddInt64(&s.ServicesBytes, recent.ServicesBytes)
}

func (s *Stats) reset() {
atomic.StoreInt64(&s.TracesReceived, 0)
atomic.StoreInt64(&s.TracesDropped, 0)
atomic.StoreInt64(&s.TracesFiltered, 0)
atomic.StoreInt64(&s.TracesBytes, 0)
atomic.StoreInt64(&s.SpansReceived, 0)
atomic.StoreInt64(&s.SpansDropped, 0)
atomic.StoreInt64(&s.SpansFiltered, 0)
atomic.StoreInt64(&s.ServicesReceived, 0)
atomic.StoreInt64(&s.ServicesBytes, 0)
}
Expand All @@ -150,12 +158,14 @@ func (s *Stats) String() string {
// Atomically load the stas
tracesReceived := atomic.LoadInt64(&s.TracesReceived)
tracesDropped := atomic.LoadInt64(&s.TracesDropped)
tracesFiltered := atomic.LoadInt64(&s.TracesFiltered)
tracesBytes := atomic.LoadInt64(&s.TracesBytes)
servicesReceived := atomic.LoadInt64(&s.ServicesReceived)
servicesBytes := atomic.LoadInt64(&s.ServicesBytes)

return fmt.Sprintf("traces received: %v, traces dropped: %v, traces amount: %v bytes, services received: %v, services amount: %v bytes",
tracesReceived, tracesDropped, tracesBytes, servicesReceived, servicesBytes)
return fmt.Sprintf("traces received: %v, traces dropped: %v, traces filtered: %v, "+
"traces amount: %v bytes, services received: %v, services amount: %v bytes",
tracesReceived, tracesDropped, tracesFiltered, tracesBytes, servicesReceived, servicesBytes)
}

// Tags holds the tags we parse when we handle the header of the payload.
Expand Down
2 changes: 1 addition & 1 deletion config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where env vars are preferrable to static files
- `DD_BIND_HOST` - overrides `[Main] bind_host`
- `DD_LOG_LEVEL` - overrides `[Main] log_level`
- `DD_RECEIVER_PORT` - overrides `[trace.receiver] receiver_port`
- `DD_RESOURCE_BLACKLIST` - overrides `[trace.config] resource_blacklist`
- `DD_BLACKLIST_RESOURCE` - overrides `[trace.blacklist] resource`


## Logging
Expand Down
12 changes: 7 additions & 5 deletions config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type AgentConfig struct {
// http/s proxying
Proxy *ProxySettings

ResourceBlacklist []string
Blacklist map[string][]string
}

// mergeEnv applies overrides from environment variables to the trace agent configuration
Expand Down Expand Up @@ -105,8 +105,8 @@ func mergeEnv(c *AgentConfig) {
c.ReceiverHost = v
}

if v := os.Getenv("DD_RESOURCE_BLACKLIST"); v != "" {
c.ResourceBlacklist, _ = splitString(v, ',')
if v := os.Getenv("DD_BLACKLIST_RESOURCE"); v != "" {
c.Blacklist["resource"], _ = splitString(v, ',')
}

if v := os.Getenv("DD_DOGSTATSD_PORT"); v != "" {
Expand Down Expand Up @@ -189,6 +189,8 @@ func NewDefaultAgentConfig() *AgentConfig {
MaxCPU: 0.5, // 50%, well behaving agents keep below 5%
MaxConnections: 200, // in practice, rarely goes over 20
WatchdogInterval: time.Minute,

Blacklist: make(map[string][]string),
}

return ac
Expand Down Expand Up @@ -271,8 +273,8 @@ APM_CONF:
c.LogFilePath = v
}

if v, e := conf.GetStrArray("trace.config", "resource_blacklist", ','); e == nil {
c.ResourceBlacklist = v
if v, e := conf.GetStrArray("trace.blacklist", "resource", ','); e == nil {
c.Blacklist["resource"] = v
}

if v := strings.ToLower(conf.GetDefault("trace.config", "log_throttling", "")); v == "no" || v == "false" {
Expand Down
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ func TestGetStrArray(t *testing.T) {

func TestGetStrArrayWithCommas(t *testing.T) {
assert := assert.New(t)
f, _ := ini.Load([]byte("[trace.config]\n\nresource_blacklist = \"x,y,z\", foobar"))
f, _ := ini.Load([]byte("[trace.blacklist]\n\nresource = \"x,y,z\", foobar"))
conf := File{f, "some/path"}

vals, err := conf.GetStrArray("trace.config", "resource_blacklist", ',')
vals, err := conf.GetStrArray("trace.blacklist", "resource", ',')
assert.Nil(err)
assert.Equal(vals, []string{"x,y,z", "foobar"})
}

func TestGetStrArrayWithEscapedSequences(t *testing.T) {
assert := assert.New(t)
f, _ := ini.Load([]byte("[trace.config]\n\nresource_blacklist = \"foo\\.bar\", \"\"\""))
f, _ := ini.Load([]byte("[trace.blacklist]\n\nresource = \"foo\\.bar\", \"\"\""))
conf := File{f, "some/path"}

vals, err := conf.GetStrArray("trace.config", "resource_blacklist", ',')
vals, err := conf.GetStrArray("trace.blacklist", "resource", ',')
assert.Nil(err)
assert.Equal(vals, []string{`foo\.bar`, `"`})
}
Expand Down
18 changes: 18 additions & 0 deletions filters/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package filters

import (
"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/model"
)

// Filter is the interface implemented by all span-filters
type Filter interface {
Keep(*model.Span) bool
}

// Setup returns a slice of all registered filters
func Setup(c *config.AgentConfig) []Filter {
return []Filter{
newResourceFilter(c),
}
}
27 changes: 11 additions & 16 deletions agent/filter.go → filters/resource.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
package main
package filters

import (
"regexp"

"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/model"

log "github.com/cihub/seelog"
)

// Filter handles Span filtering
type Filter interface {
Keep(t *model.Span) bool
}

// ResourceFilter does resource-based filtering
type ResourceFilter struct {
// ResourceFilter implements a resource-based filter
type resourceFilter struct {
blacklist []*regexp.Regexp
}

// NewResourceFilter returns a ResourceFilter holding compiled regexes
func NewResourceFilter(conf *config.AgentConfig) *ResourceFilter {
blacklist := compileRules(conf.ResourceBlacklist)

return &ResourceFilter{blacklist}
}

// Keep returns true if Span.Resource doesn't match any of the filter's rules
func (f *ResourceFilter) Keep(t *model.Span) bool {
func (f *resourceFilter) Keep(t *model.Span) bool {
for _, entry := range f.blacklist {
if entry.MatchString(t.Resource) {
return false
Expand All @@ -36,6 +25,12 @@ func (f *ResourceFilter) Keep(t *model.Span) bool {
return true
}

func newResourceFilter(conf *config.AgentConfig) Filter {
blacklist := compileRules(conf.Blacklist["resource"])

return &resourceFilter{blacklist}
}

func compileRules(entries []string) []*regexp.Regexp {
blacklist := make([]*regexp.Regexp, 0, len(entries))

Expand Down
10 changes: 6 additions & 4 deletions agent/filter_test.go → filters/resource_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package filters

import (
"testing"
Expand Down Expand Up @@ -61,9 +61,11 @@ func TestMultipleEntries(t *testing.T) {
assert.False(t, filter.Keep(span))
}

func newTestFilter(blacklist ...string) *ResourceFilter {
conf := &config.AgentConfig{ResourceBlacklist: blacklist}
return NewResourceFilter(conf)
func newTestFilter(blacklist ...string) Filter {
c := config.NewDefaultAgentConfig()
c.Blacklist["resource"] = blacklist

return newResourceFilter(c)
}

func newTestSpan(resource string) *model.Span {
Expand Down

0 comments on commit 32d7386

Please sign in to comment.