diff --git a/config.go b/config.go index d236eac26..b5798205b 100644 --- a/config.go +++ b/config.go @@ -7,8 +7,13 @@ type Config struct { AwsRegion string `yaml:"aws_region"` AwsS3Bucket string `yaml:"aws_s3_bucket"` AwsSecretAccessKey string `yaml:"aws_secret_access_key"` + CirconusListenerKey string `yaml:"circonus_listener_tokenkey"` + CirconusListenerInstance string `yaml:"circonus_listener_instance"` + CirconusListenerTags string `yaml:"circonus_listener_tags"` DatadogAPIHostname string `yaml:"datadog_api_hostname"` DatadogAPIKey string `yaml:"datadog_api_key"` + DataDogListenerAddr string `yaml:"datadog_listener_addr"` + DataDogListenerHost string `yaml:"datadog_listener_host"` DatadogTraceAPIAddress string `yaml:"datadog_trace_api_address"` Debug bool `yaml:"debug"` EnableProfiling bool `yaml:"enable_profiling"` @@ -32,6 +37,7 @@ type Config struct { SsfAddress string `yaml:"ssf_address"` SsfBufferSize int `yaml:"ssf_buffer_size"` StatsAddress string `yaml:"stats_address"` + StatsDListenerAddr string `yaml:"statsd_listener_addr"` Tags []string `yaml:"tags"` TcpAddress string `yaml:"tcp_address"` TLSAuthorityCertificate string `yaml:"tls_authority_certificate"` diff --git a/example.yaml b/example.yaml index a408f74ea..be20dadad 100644 --- a/example.yaml +++ b/example.yaml @@ -154,3 +154,28 @@ aws_s3_bucket: "" # == LocalFile Output == # Include this if you want to archive data to a local file (which should then be rotated/cleaned) flush_file: "" + +# == Go-Metrics based listener backends == +# Specify various backends to syndicate the metrics to +listeners: + # - "circonus" + # - "datadog" + # - "prometheus" + # - "statsd" + +## circonus listener config +# api token - https://login.circonus.com/resources/api#authentication +circonus_listener_tokenkey: "abc123" + +# specify the check name to be created for the circonus listener +circonus_listener_instance: "Veneur Listener" + +# tags to add when the check is being created (comma separated) +circonus_listener_tags: "veneur" + +## datadog listener config +datadog_listener_addr: "127.0.0.1:8131" +datadog_listener_host: "datadog.foo.com" + +## statsd listener config +statsd_listener_addr: "127.0.0.1:8125" diff --git a/listener.go b/listener.go new file mode 100644 index 000000000..0e9a438d3 --- /dev/null +++ b/listener.go @@ -0,0 +1,106 @@ +package veneur + +import ( + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/circonus" + datadog "github.com/armon/go-metrics/datadog" + prometheus "github.com/armon/go-metrics/prometheus" + "github.com/stripe/veneur/samplers" +) + +// Listener uses a go-metrics MetricSink interface to syndicate UDPMetric +// metrics to monitoring systems external to veneur +type Listener struct { + + // leading article + ListenerType string + MetricChan chan samplers.UDPMetric + MetricSink interface { + // A Gauge should retain the last value it is set to + SetGauge(key []string, val float32) + + // Should emit a Key/Value pair for each call + EmitKey(key []string, val float32) + + // Counters should accumulate values + IncrCounter(key []string, val float32) + + // Samples are for timing information, where quantiles are used + AddSample(key []string, val float32) + } +} + +// NewListener creates a new Listener instance +// See https://github.com/armon/go-metrics for supported types +func NewListener(name string, conf Config) (*Listener, error) { + + listener := Listener{ + ListenerType: name, + MetricChan: make(chan samplers.UDPMetric), + } + + switch name { + case "circonus": + // https://github.com/circonus-labs/circonus-gometrics/blob/master/OPTIONS.md + cfg := &circonus.Config{} + cfg.CheckManager.API.TokenKey = conf.CirconusListenerKey + cfg.CheckManager.Check.InstanceID = conf.CirconusListenerInstance + cfg.CheckManager.Check.Tags = conf.CirconusListenerTags + + sink, err := circonus.NewCirconusSink(cfg) + if err != nil { + log.WithField("error", err).Error("error creating sink") + return nil, err + } + sink.Start() + listener.MetricSink = sink + + case "datadog": + sink, err := datadog.NewDogStatsdSink(conf.DataDogListenerAddr, conf.DataDogListenerHost) + if err != nil { + log.WithField("error", err).Error("error creating datadog sink") + return nil, err + } + listener.MetricSink = sink + + case "prometheus": + sink, err := prometheus.NewPrometheusSink() + if err != nil { + log.WithField("error", err).Error("error creating prometheus sink") + return nil, err + } + listener.MetricSink = sink + + case "statsd": + sink, err := metrics.NewStatsdSink(conf.StatsDListenerAddr) + if err != nil { + log.WithField("error", err).Error("error creating datadog sink") + return nil, err + } + listener.MetricSink = sink + } + + return &listener, nil +} + +// Listen reads UDPMetrics from the metric channel and writes them to the go-metrics interface +func (l *Listener) Listen() { + + log.WithField("listener", l).Debug("listener listening") + + for { + select { + case m := <-l.MetricChan: + switch m.Type { + case "counter": + l.MetricSink.IncrCounter([]string{m.MetricKey.Name}, float32(m.Value.(float64))) + + case "gauge": + l.MetricSink.SetGauge([]string{m.MetricKey.Name}, float32(m.Value.(float64))) + + case "timer": + l.MetricSink.SetGauge([]string{m.MetricKey.Name}, float32(m.Value.(float64))) + } + } + } +} diff --git a/listener_test.go b/listener_test.go new file mode 100644 index 000000000..c5ffd9c39 --- /dev/null +++ b/listener_test.go @@ -0,0 +1,32 @@ +package veneur + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewListener(t *testing.T) { + + config := localConfig() + + // circonus listener + listenerType := "circonus" + _, err := NewListener(listenerType, config) + assert.NoError(t, err, "circonus listener should have created successfully") + + // datadog listener + listenerType = "datadog" + _, err = NewListener(listenerType, config) + assert.NoError(t, err, "datadog listener should have created successfully") + + // prometheus listener + listenerType = "prometheus" + _, err = NewListener(listenerType, config) + assert.NoError(t, err, "prometheus listener should have created successfully") + + // statsd listener + listenerType = "statsd" + _, err = NewListener(listenerType, config) + assert.NoError(t, err, "statsd listener should have created successfully") +} diff --git a/server.go b/server.go index 1089ec101..aa7647aa4 100644 --- a/server.go +++ b/server.go @@ -67,6 +67,7 @@ type Server struct { Workers []*Worker EventWorker *EventWorker TraceWorker *TraceWorker + Listeners []*Listener Statsd *statsd.Client Sentry *raven.Client @@ -219,6 +220,29 @@ func NewFromConfig(conf Config) (ret Server, err error) { ret.EventWorker = NewEventWorker(ret.Statsd) + // only setup listeners on local nodes + if conf.ForwardAddress != "" { + + ret.Listeners = make([]*Listener, len(conf.Listeners)) + for i := range ret.Listeners { + + // create the listener + listener, err := NewListener(conf.Listeners[i], conf) + if err != nil { + log.WithField("err", err).Error("could not create listener") + continue + } + + // launch the listener + go func(l *Listener) { + l.Listen() + }(listener) + + // register the listener + ret.Listeners[i] = listener + } + } + ret.UDPAddr, err = net.ResolveUDPAddr("udp", conf.UdpAddress) if err != nil { return @@ -590,6 +614,11 @@ func (s *Server) HandleMetricPacket(packet []byte) error { return err } s.Workers[metric.Digest%uint32(len(s.Workers))].PacketChan <- *metric + + // write the metric to any backend listeners + for _, l := range s.Listeners { + l.MetricChan <- *metric + } } return nil } diff --git a/server_test.go b/server_test.go index 76bccc08e..d33a99b40 100644 --- a/server_test.go +++ b/server_test.go @@ -80,6 +80,13 @@ func generateConfig(forwardAddr string) Config { Debug: DebugMode, Hostname: "localhost", + // test directives for the listeners + CirconusListenerKey: "test_token", + CirconusListenerInstance: "test_instance", + DataDogListenerAddr: "127.0.0.1:8131", + DataDogListenerHost: "datadog.foo.com", + StatsDListenerAddr: "127.0.0.1:8131", + // Use a shorter interval for tests Interval: DefaultFlushInterval.String(), Key: "",