Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Trace Filtering #309

Merged
merged 4 commits into from
Aug 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ end
PACKAGES = %w(
./agent
./config
./filters
./fixtures
./model
./quantile
Expand Down
16 changes: 16 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/cihub/seelog"

"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/filters"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/quantizer"
"github.com/DataDog/datadog-trace-agent/sampler"
Expand Down Expand Up @@ -35,6 +36,7 @@ func (pt *processedTrace) weight() float64 {
type Agent struct {
Receiver *HTTPReceiver
Concentrator *Concentrator
Filters []filters.Filter
Sampler *Sampler
Writer *Writer

Expand All @@ -56,6 +58,7 @@ func NewAgent(conf *config.AgentConfig) *Agent {
conf.ExtraAggregators,
conf.BucketInterval.Nanoseconds(),
)
f := filters.Setup(conf)
s := NewSampler(conf)

w := NewWriter(conf)
Expand All @@ -64,6 +67,7 @@ func NewAgent(conf *config.AgentConfig) *Agent {
return &Agent{
Receiver: r,
Concentrator: c,
Filters: f,
Sampler: s,
Writer: w,
conf: conf,
Expand Down Expand Up @@ -147,6 +151,18 @@ func (a *Agent) Process(t model.Trace) {

atomic.AddInt64(&ts.TracesDropped, 1)
atomic.AddInt64(&ts.SpansDropped, int64(len(t)))
return
}

for _, f := range a.Filters {
if f.Keep(root) {
continue
}

log.Debugf("rejecting trace by filter: %T %v", f, *root)
ts := a.Receiver.stats.getTagStats(Tags{})
atomic.AddInt64(&ts.TracesFiltered, 1)
atomic.AddInt64(&ts.SpansFiltered, int64(len(t)))

return
}
Expand Down
30 changes: 26 additions & 4 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,34 @@ func TestWatchdog(t *testing.T) {
}

func BenchmarkAgentTraceProcessing(b *testing.B) {
// Disable debug logs in these tests
c := config.NewDefaultAgentConfig()
c.APIKey = "test"

runTraceProcessingBenchmark(b, c)
}

func BenchmarkAgentTraceProcessingWithFiltering(b *testing.B) {
c := config.NewDefaultAgentConfig()
c.APIKey = "test"
c.Ignore["resource"] = []string{"[0-9]{3}", "foobar", "G.T [a-z]+", "[^123]+_baz"}

runTraceProcessingBenchmark(b, c)
}

// worst case scenario: spans are tested against multiple rules without any match.
// this means we won't compesate the overhead of filtering by dropping traces
func BenchmarkAgentTraceProcessingWithWorstCaseFiltering(b *testing.B) {
c := config.NewDefaultAgentConfig()
c.APIKey = "test"
c.Ignore["resource"] = []string{"[0-9]{3}", "foobar", "aaaaa?aaaa", "[^123]+_baz"}

runTraceProcessingBenchmark(b, c)
}

func runTraceProcessingBenchmark(b *testing.B, c *config.AgentConfig) {
agent := NewAgent(c)
log.UseLogger(log.Disabled)

conf := config.NewDefaultAgentConfig()
conf.APIKey = "test"
agent := NewAgent(conf)
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
Expand Down
18 changes: 16 additions & 2 deletions agent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,22 @@ func (ts *tagStats) publish() {
// Atomically load the stats from ts
tracesReceived := atomic.LoadInt64(&ts.TracesReceived)
tracesDropped := atomic.LoadInt64(&ts.TracesDropped)
tracesFiltered := atomic.LoadInt64(&ts.TracesFiltered)
tracesBytes := atomic.LoadInt64(&ts.TracesBytes)
spansReceived := atomic.LoadInt64(&ts.SpansReceived)
spansDropped := atomic.LoadInt64(&ts.SpansDropped)
spansFiltered := atomic.LoadInt64(&ts.SpansFiltered)
servicesReceived := atomic.LoadInt64(&ts.ServicesReceived)
servicesBytes := atomic.LoadInt64(&ts.ServicesBytes)

// Publish the stats
statsd.Client.Count("datadog.trace_agent.receiver.traces_received", tracesReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_dropped", tracesDropped, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_filtered", tracesFiltered, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.traces_bytes", tracesBytes, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_received", spansReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_dropped", spansDropped, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.spans_filtered", spansFiltered, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_received", servicesReceived, ts.Tags.toArray(), 1)
statsd.Client.Count("datadog.trace_agent.receiver.services_bytes", servicesBytes, ts.Tags.toArray(), 1)
}
Expand All @@ -109,12 +113,16 @@ type Stats struct {
TracesReceived int64
// TracesDropped is the number of traces dropped.
TracesDropped int64
// TracesDropped is the number of traces filtered.
TracesFiltered int64
// TracesBytes is the amount of data received on the traces endpoint (raw data, encoded, compressed).
TracesBytes int64
// SpansReceived is the total number of spans received, including the dropped ones.
SpansReceived int64
// SpansDropped is the number of spans dropped.
SpansDropped int64
// SpansFiltered is the number of spans filtered.
SpansFiltered int64
// ServicesReceived is the number of services received.
ServicesReceived int64
// ServicesBytes is the amount of data received on the services endpoint (raw data, encoded, compressed).
Expand All @@ -124,19 +132,23 @@ type Stats struct {
func (s *Stats) update(recent Stats) {
atomic.AddInt64(&s.TracesReceived, recent.TracesReceived)
atomic.AddInt64(&s.TracesDropped, recent.TracesDropped)
atomic.AddInt64(&s.TracesFiltered, recent.TracesFiltered)
atomic.AddInt64(&s.TracesBytes, recent.TracesBytes)
atomic.AddInt64(&s.SpansReceived, recent.SpansReceived)
atomic.AddInt64(&s.SpansDropped, recent.SpansDropped)
atomic.AddInt64(&s.SpansFiltered, recent.SpansFiltered)
atomic.AddInt64(&s.ServicesReceived, recent.ServicesReceived)
atomic.AddInt64(&s.ServicesBytes, recent.ServicesBytes)
}

func (s *Stats) reset() {
atomic.StoreInt64(&s.TracesReceived, 0)
atomic.StoreInt64(&s.TracesDropped, 0)
atomic.StoreInt64(&s.TracesFiltered, 0)
atomic.StoreInt64(&s.TracesBytes, 0)
atomic.StoreInt64(&s.SpansReceived, 0)
atomic.StoreInt64(&s.SpansDropped, 0)
atomic.StoreInt64(&s.SpansFiltered, 0)
atomic.StoreInt64(&s.ServicesReceived, 0)
atomic.StoreInt64(&s.ServicesBytes, 0)
}
Expand All @@ -146,12 +158,14 @@ func (s *Stats) String() string {
// Atomically load the stas
tracesReceived := atomic.LoadInt64(&s.TracesReceived)
tracesDropped := atomic.LoadInt64(&s.TracesDropped)
tracesFiltered := atomic.LoadInt64(&s.TracesFiltered)
tracesBytes := atomic.LoadInt64(&s.TracesBytes)
servicesReceived := atomic.LoadInt64(&s.ServicesReceived)
servicesBytes := atomic.LoadInt64(&s.ServicesBytes)

return fmt.Sprintf("traces received: %v, traces dropped: %v, traces amount: %v bytes, services received: %v, services amount: %v bytes",
tracesReceived, tracesDropped, tracesBytes, servicesReceived, servicesBytes)
return fmt.Sprintf("traces received: %v, traces dropped: %v, traces filtered: %v, "+
"traces amount: %v bytes, services received: %v, services amount: %v bytes",
tracesReceived, tracesDropped, tracesFiltered, tracesBytes, servicesReceived, servicesBytes)
}

// Tags holds the tags we parse when we handle the header of the payload.
Expand Down
5 changes: 5 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ receiver_port=8126
# how many unique client connections to allow during one 30 second lease period
connection_limit=2000

[trace.ignore]
# a blacklist of regular expressions can be provided to disable certain traces based on their resource name
# all entries must be surrounded by double quotes and separated by comas
resource="GET|POST /healthcheck","GET /V1"
```


Expand All @@ -76,6 +80,7 @@ where env vars are preferrable to static files
- `DD_BIND_HOST` - overrides `[Main] bind_host`
- `DD_LOG_LEVEL` - overrides `[Main] log_level`
- `DD_RECEIVER_PORT` - overrides `[trace.receiver] receiver_port`
- `DD_IGNORE_RESOURCE` - overrides `[trace.ignore] resource`


## Logging
Expand Down
14 changes: 13 additions & 1 deletion config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type AgentConfig struct {

// http/s proxying
Proxy *ProxySettings

Ignore map[string][]string
}

// mergeEnv applies overrides from environment variables to the trace agent configuration
Expand Down Expand Up @@ -103,6 +105,10 @@ func mergeEnv(c *AgentConfig) {
c.ReceiverHost = v
}

if v := os.Getenv("DD_IGNORE_RESOURCE"); v != "" {
c.Ignore["resource"], _ = splitString(v, ',')
}

if v := os.Getenv("DD_DOGSTATSD_PORT"); v != "" {
port, err := strconv.Atoi(v)
if err != nil {
Expand Down Expand Up @@ -183,6 +189,8 @@ func NewDefaultAgentConfig() *AgentConfig {
MaxCPU: 0.5, // 50%, well behaving agents keep below 5%
MaxConnections: 200, // in practice, rarely goes over 20
WatchdogInterval: time.Minute,

Ignore: make(map[string][]string),
}

return ac
Expand Down Expand Up @@ -265,6 +273,10 @@ APM_CONF:
c.LogFilePath = v
}

if v, e := conf.GetStrArray("trace.ignore", "resource", ','); e == nil {
c.Ignore["resource"] = v
}

if v := strings.ToLower(conf.GetDefault("trace.config", "log_throttling", "")); v == "no" || v == "false" {
c.LogThrottlingEnabled = false
}
Expand Down Expand Up @@ -295,7 +307,7 @@ APM_CONF:
c.BucketInterval = time.Duration(v) * time.Second
}

if v, e := conf.GetStrArray("trace.concentrator", "extra_aggregators", ","); e == nil {
if v, e := conf.GetStrArray("trace.concentrator", "extra_aggregators", ','); e == nil {
c.ExtraAggregators = append(c.ExtraAggregators, v...)
} else {
log.Debug("No aggregator configuration, using defaults")
Expand Down
30 changes: 22 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"encoding/csv"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -90,19 +91,32 @@ func (c *File) GetFloat(section, name string) (float64, error) {
}

// GetStrArray returns the value split across `sep` into an array of strings.
func (c *File) GetStrArray(section, name, sep string) ([]string, error) {
if exists := c.instance.Section(section).HasKey(name); !exists {
return []string{}, fmt.Errorf("missing `%s` value in [%s] section", name, section)
}
func (c *File) GetStrArray(section, name string, sep rune) ([]string, error) {
value, err := c.Get(section, name)

value := c.instance.Section(section).Key(name).String()
if value == "" {
return []string{}, nil
if err != nil || value == "" {
return []string{}, err
}
return strings.Split(value, sep), nil

return splitString(value, sep)
}

// GetSection is a convenience method to return an entire section of ini config
func (c *File) GetSection(key string) (*ini.Section, error) {
return c.instance.GetSection(key)
}

func splitString(s string, sep rune) ([]string, error) {
r := csv.NewReader(strings.NewReader(s))
r.TrimLeadingSpace = true
r.LazyQuotes = true
r.Comma = sep

record, err := r.Read()

if err != nil {
return []string{}, err
}

return record, nil
}
24 changes: 22 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,31 @@ func TestGetStrArray(t *testing.T) {
"some/path",
}

ports, err := conf.GetStrArray("Main", "ports", ",")
ports, err := conf.GetStrArray("Main", "ports", ',')
assert.Nil(err)
assert.Equal(ports, []string{"10", "15", "20", "25"})
}

func TestGetStrArrayWithCommas(t *testing.T) {
assert := assert.New(t)
f, _ := ini.Load([]byte("[trace.ignore]\n\nresource = \"x,y,z\", foobar"))
conf := File{f, "some/path"}

vals, err := conf.GetStrArray("trace.ignore", "resource", ',')
assert.Nil(err)
assert.Equal(vals, []string{"x,y,z", "foobar"})
}

func TestGetStrArrayWithEscapedSequences(t *testing.T) {
assert := assert.New(t)
f, _ := ini.Load([]byte("[trace.ignore]\n\nresource = \"foo\\.bar\", \"\"\""))
conf := File{f, "some/path"}

vals, err := conf.GetStrArray("trace.ignore", "resource", ',')
assert.Nil(err)
assert.Equal(vals, []string{`foo\.bar`, `"`})
}

func TestGetStrArrayEmpty(t *testing.T) {
assert := assert.New(t)
f, _ := ini.Load([]byte("[Main]\n\nports = "))
Expand All @@ -32,7 +52,7 @@ func TestGetStrArrayEmpty(t *testing.T) {
"some/path",
}

ports, err := conf.GetStrArray("Main", "ports", ",")
ports, err := conf.GetStrArray("Main", "ports", ',')
assert.Nil(err)
assert.Equal([]string{}, ports)
}
Expand Down
18 changes: 18 additions & 0 deletions filters/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package filters

import (
"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/model"
)

// Filter is the interface implemented by all span-filters
type Filter interface {
Keep(*model.Span) bool
}

// Setup returns a slice of all registered filters
func Setup(c *config.AgentConfig) []Filter {
return []Filter{
newResourceFilter(c),
}
}
Loading