Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add go-metrics listeners for metric syndication #127

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ type Config struct {
AwsRegion string `yaml:"aws_region"`
AwsS3Bucket string `yaml:"aws_s3_bucket"`
AwsSecretAccessKey string `yaml:"aws_secret_access_key"`
CirconusListenerKey string `yaml:"circonus_listener_tokenkey"`
CirconusListenerInstance string `yaml:"circonus_listener_instance"`
CirconusListenerTags string `yaml:"circonus_listener_tags"`
DatadogAPIHostname string `yaml:"datadog_api_hostname"`
DatadogAPIKey string `yaml:"datadog_api_key"`
DataDogListenerAddr string `yaml:"datadog_listener_addr"`
DataDogListenerHost string `yaml:"datadog_listener_host"`
DatadogTraceAPIAddress string `yaml:"datadog_trace_api_address"`
Debug bool `yaml:"debug"`
EnableProfiling bool `yaml:"enable_profiling"`
Expand All @@ -32,6 +37,7 @@ type Config struct {
SsfAddress string `yaml:"ssf_address"`
SsfBufferSize int `yaml:"ssf_buffer_size"`
StatsAddress string `yaml:"stats_address"`
StatsDListenerAddr string `yaml:"statsd_listener_addr"`
Tags []string `yaml:"tags"`
TcpAddress string `yaml:"tcp_address"`
TLSAuthorityCertificate string `yaml:"tls_authority_certificate"`
Expand Down
25 changes: 25 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,28 @@ aws_s3_bucket: ""
# == LocalFile Output ==
# Include this if you want to archive data to a local file (which should then be rotated/cleaned)
flush_file: ""

# == Go-Metrics based listener backends ==
# Specify various backends to syndicate the metrics to
listeners:
# - "circonus"
# - "datadog"
# - "prometheus"
# - "statsd"

## circonus listener config
# api token - https://login.circonus.com/resources/api#authentication
circonus_listener_tokenkey: "abc123"

# specify the check name to be created for the circonus listener
circonus_listener_instance: "Veneur Listener"

# tags to add when the check is being created (comma separated)
circonus_listener_tags: "veneur"

## datadog listener config
datadog_listener_addr: "127.0.0.1:8131"
datadog_listener_host: "datadog.foo.com"

## statsd listener config
statsd_listener_addr: "127.0.0.1:8125"
106 changes: 106 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package veneur

import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/circonus"
datadog "github.com/armon/go-metrics/datadog"
prometheus "github.com/armon/go-metrics/prometheus"
"github.com/stripe/veneur/samplers"
)

// Listener uses a go-metrics MetricSink interface to syndicate UDPMetric
// metrics to monitoring systems external to veneur
type Listener struct {

// leading article
ListenerType string
MetricChan chan samplers.UDPMetric
MetricSink interface {
// A Gauge should retain the last value it is set to
SetGauge(key []string, val float32)

// Should emit a Key/Value pair for each call
EmitKey(key []string, val float32)

// Counters should accumulate values
IncrCounter(key []string, val float32)

// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
}
}

// NewListener creates a new Listener instance
// See https://github.com/armon/go-metrics for supported types
func NewListener(name string, conf Config) (*Listener, error) {

listener := Listener{
ListenerType: name,
MetricChan: make(chan samplers.UDPMetric),
}

switch name {
case "circonus":
// https://github.com/circonus-labs/circonus-gometrics/blob/master/OPTIONS.md
cfg := &circonus.Config{}
cfg.CheckManager.API.TokenKey = conf.CirconusListenerKey
cfg.CheckManager.Check.InstanceID = conf.CirconusListenerInstance
cfg.CheckManager.Check.Tags = conf.CirconusListenerTags

sink, err := circonus.NewCirconusSink(cfg)
if err != nil {
log.WithField("error", err).Error("error creating sink")
return nil, err
}
sink.Start()
listener.MetricSink = sink

case "datadog":
sink, err := datadog.NewDogStatsdSink(conf.DataDogListenerAddr, conf.DataDogListenerHost)
if err != nil {
log.WithField("error", err).Error("error creating datadog sink")
return nil, err
}
listener.MetricSink = sink

case "prometheus":
sink, err := prometheus.NewPrometheusSink()
if err != nil {
log.WithField("error", err).Error("error creating prometheus sink")
return nil, err
}
listener.MetricSink = sink

case "statsd":
sink, err := metrics.NewStatsdSink(conf.StatsDListenerAddr)
if err != nil {
log.WithField("error", err).Error("error creating datadog sink")
return nil, err
}
listener.MetricSink = sink
}

return &listener, nil
}

// Listen reads UDPMetrics from the metric channel and writes them to the go-metrics interface
func (l *Listener) Listen() {

log.WithField("listener", l).Debug("listener listening")

for {
select {
case m := <-l.MetricChan:
switch m.Type {
case "counter":
l.MetricSink.IncrCounter([]string{m.MetricKey.Name}, float32(m.Value.(float64)))

case "gauge":
l.MetricSink.SetGauge([]string{m.MetricKey.Name}, float32(m.Value.(float64)))

case "timer":
l.MetricSink.SetGauge([]string{m.MetricKey.Name}, float32(m.Value.(float64)))
}
}
}
}
32 changes: 32 additions & 0 deletions listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package veneur

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewListener(t *testing.T) {

config := localConfig()

// circonus listener
listenerType := "circonus"
_, err := NewListener(listenerType, config)
assert.NoError(t, err, "circonus listener should have created successfully")

// datadog listener
listenerType = "datadog"
_, err = NewListener(listenerType, config)
assert.NoError(t, err, "datadog listener should have created successfully")

// prometheus listener
listenerType = "prometheus"
_, err = NewListener(listenerType, config)
assert.NoError(t, err, "prometheus listener should have created successfully")

// statsd listener
listenerType = "statsd"
_, err = NewListener(listenerType, config)
assert.NoError(t, err, "statsd listener should have created successfully")
}
29 changes: 29 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Server struct {
Workers []*Worker
EventWorker *EventWorker
TraceWorker *TraceWorker
Listeners []*Listener

Statsd *statsd.Client
Sentry *raven.Client
Expand Down Expand Up @@ -219,6 +220,29 @@ func NewFromConfig(conf Config) (ret Server, err error) {

ret.EventWorker = NewEventWorker(ret.Statsd)

// only setup listeners on local nodes
if conf.ForwardAddress != "" {

ret.Listeners = make([]*Listener, len(conf.Listeners))
for i := range ret.Listeners {

// create the listener
listener, err := NewListener(conf.Listeners[i], conf)
if err != nil {
log.WithField("err", err).Error("could not create listener")
continue
}

// launch the listener
go func(l *Listener) {
l.Listen()
}(listener)

// register the listener
ret.Listeners[i] = listener
}
}

ret.UDPAddr, err = net.ResolveUDPAddr("udp", conf.UdpAddress)
if err != nil {
return
Expand Down Expand Up @@ -590,6 +614,11 @@ func (s *Server) HandleMetricPacket(packet []byte) error {
return err
}
s.Workers[metric.Digest%uint32(len(s.Workers))].PacketChan <- *metric

// write the metric to any backend listeners
for _, l := range s.Listeners {
l.MetricChan <- *metric
}
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ func generateConfig(forwardAddr string) Config {
Debug: DebugMode,
Hostname: "localhost",

// test directives for the listeners
CirconusListenerKey: "test_token",
CirconusListenerInstance: "test_instance",
DataDogListenerAddr: "127.0.0.1:8131",
DataDogListenerHost: "datadog.foo.com",
StatsDListenerAddr: "127.0.0.1:8131",

// Use a shorter interval for tests
Interval: DefaultFlushInterval.String(),
Key: "",
Expand Down