diff --git a/CHANGELOG.md b/CHANGELOG.md index 7456e2345332a..429f934ca8788 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support - [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin +- [#2293](https://github.com/influxdata/telegraf/pull/2293): Add TCP listener for statsd input - [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted - [#2817](https://github.com/influxdata/telegraf/pull/2817): Add timeout option to IPMI sensor plugin - [#2883](https://github.com/influxdata/telegraf/pull/2883): Add support for an optional SSL/TLS configuration to nginx input plugin diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 3e1395c02f3b4..7d1f2562632bd 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -2452,6 +2452,10 @@ # # Statsd Server # [[inputs.statsd]] +# ## Protocol, must be "tcp" or "udp" +# protocol = "udp" +# ## Maximum number of concurrent TCP connections to allow +# max_tcp_connections = 250 # ## Address and port to host UDP listener on # service_address = ":8125" # diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 91070419ab25c..17f960b2050ae 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -5,6 +5,12 @@ ```toml # Statsd Server [[inputs.statsd]] + ## Protocol, must be "tcp" or "udp" (default=udp) + protocol = "udp" + + ## MaxTCPConnection - applicable when protocol is set to tcp (default=250) + max_tcp_connections = 250 + ## Address and port to host UDP listener on service_address = ":8125" @@ -146,6 +152,9 @@ metric type: ### Plugin arguments +- **protocol** string: Protocol used in listener - tcp or udp options +- **max_tcp_connections** []int: Maximum number of concurrent TCP connections +to allow. Used when protocol is set to tcp. - **service_address** string: Address to listen for statsd UDP packets on - **delete_gauges** boolean: Delete gauges on every collection interval - **delete_counters** boolean: Delete counters on every collection interval diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 144a3675f207c..16233a6c95a7e 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -1,6 +1,7 @@ package statsd import ( + "bufio" "errors" "fmt" "log" @@ -14,7 +15,9 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/selfstat" ) const ( @@ -24,15 +27,24 @@ const ( defaultFieldName = "value" + defaultProtocol = "udp" + defaultSeparator = "_" defaultAllowPendingMessage = 10000 + MaxTCPConnections = 250 ) var dropwarn = "E! Error: statsd message queue full. " + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" +var malformedwarn = "E! Statsd over TCP has received %d malformed packets" + + " thus far." + type Statsd struct { + // Protocol used on listener - udp or tcp + Protocol string `toml:"protocol"` + // Address & Port to serve from ServiceAddress string @@ -64,9 +76,17 @@ type Statsd struct { UDPPacketSize int `toml:"udp_packet_size"` sync.Mutex - wg sync.WaitGroup + // Lock for preventing a data race during resource cleanup + cleanup sync.Mutex + wg sync.WaitGroup + // accept channel tracks how many active connections there are, if there + // is an available bool in accept, then we are below the maximum and can + // accept the connection + accept chan bool // drops tracks the number of dropped metrics. drops int + // malformed tracks the number of malformed packets + malformed int // Channel for all incoming statsd packets in chan []byte @@ -83,9 +103,24 @@ type Statsd struct { // bucket -> influx templates Templates []string - listener *net.UDPConn + // Protocol listeners + UDPlistener *net.UDPConn + TCPlistener *net.TCPListener + + // track current connections so we can close them in Stop() + conns map[string]*net.TCPConn + + MaxTCPConnections int `toml:"max_tcp_connections"` graphiteParser *graphite.GraphiteParser + + acc telegraf.Accumulator + + MaxConnections selfstat.Stat + CurrentConnections selfstat.Stat + TotalConnections selfstat.Stat + PacketsRecv selfstat.Stat + BytesRecv selfstat.Stat } // One statsd metric, form is :||@ @@ -128,10 +163,16 @@ type cachedtimings struct { } func (_ *Statsd) Description() string { - return "Statsd Server" + return "Statsd UDP/TCP Server" } const sampleConfig = ` + ## Protocol, must be "tcp" or "udp" (default=udp) + protocol = "udp" + + ## MaxTCPConnection - applicable when protocol is set to tcp (default=250) + max_tcp_connections = 250 + ## Address and port to host UDP listener on service_address = ":8125" @@ -247,6 +288,27 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.Lock() + defer s.Unlock() + // + tags := map[string]string{ + "address": s.ServiceAddress, + } + s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags) + s.MaxConnections.Set(int64(s.MaxTCPConnections)) + s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags) + s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags) + s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags) + s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags) + + s.in = make(chan []byte, s.AllowedPendingMessages) + s.done = make(chan struct{}) + s.accept = make(chan bool, s.MaxTCPConnections) + s.conns = make(map[string]*net.TCPConn) + for i := 0; i < s.MaxTCPConnections; i++ { + s.accept <- true + } + if s.ConvertNames { log.Printf("I! WARNING statsd: convert_names config option is deprecated," + " please use metric_separator instead") @@ -258,23 +320,67 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error { s.wg.Add(2) // Start the UDP listener - go s.udpListen() + switch s.Protocol { + case "udp": + go s.udpListen() + case "tcp": + go s.tcpListen() + } // Start the line parser go s.parser() log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress) return nil } +// tcpListen() starts listening for udp packets on the configured port. +func (s *Statsd) tcpListen() error { + defer s.wg.Done() + // Start listener + var err error + address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress) + s.TCPlistener, err = net.ListenTCP("tcp", address) + if err != nil { + log.Fatalf("ERROR: ListenTCP - %s", err) + return err + } + log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String()) + for { + select { + case <-s.done: + return nil + default: + // Accept connection: + conn, err := s.TCPlistener.AcceptTCP() + if err != nil { + return err + } + + select { + case <-s.accept: + // not over connection limit, handle the connection properly. + s.wg.Add(1) + // generate a random id for this TCPConn + id := internal.RandomString(6) + s.remember(id, conn) + go s.handler(conn, id) + default: + // We are over the connection limit, refuse & close. + s.refuser(conn) + } + } + } +} + // udpListen starts listening for udp packets on the configured port. func (s *Statsd) udpListen() error { defer s.wg.Done() var err error address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress) - s.listener, err = net.ListenUDP("udp", address) + s.UDPlistener, err = net.ListenUDP("udp", address) if err != nil { log.Fatalf("ERROR: ListenUDP - %s", err) } - log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String()) + log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String()) buf := make([]byte, UDP_MAX_PACKET_SIZE) for { @@ -282,7 +388,7 @@ func (s *Statsd) udpListen() error { case <-s.done: return nil default: - n, _, err := s.listener.ReadFromUDP(buf) + n, _, err := s.UDPlistener.ReadFromUDP(buf) if err != nil && !strings.Contains(err.Error(), "closed network") { log.Printf("E! Error READ: %s\n", err.Error()) continue @@ -637,20 +743,111 @@ func (s *Statsd) aggregate(m metric) { } } +// handler handles a single TCP Connection +func (s *Statsd) handler(conn *net.TCPConn, id string) { + s.CurrentConnections.Incr(1) + s.TotalConnections.Incr(1) + // connection cleanup function + defer func() { + s.wg.Done() + conn.Close() + // Add one connection potential back to channel when this one closes + s.accept <- true + s.forget(id) + s.CurrentConnections.Incr(-1) + }() + + var n int + scanner := bufio.NewScanner(conn) + for { + select { + case <-s.done: + return + default: + if !scanner.Scan() { + return + } + n = len(scanner.Bytes()) + if n == 0 { + continue + } + s.BytesRecv.Incr(int64(n)) + s.PacketsRecv.Incr(1) + bufCopy := make([]byte, n+1) + copy(bufCopy, scanner.Bytes()) + bufCopy[n] = '\n' + + select { + case s.in <- bufCopy: + default: + s.drops++ + if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { + log.Printf(dropwarn, s.drops) + } + } + } + } +} + +// refuser refuses a TCP connection +func (s *Statsd) refuser(conn *net.TCPConn) { + conn.Close() + log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr()) + log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" + + " adjust max_tcp_connections") +} + +// forget a TCP connection +func (s *Statsd) forget(id string) { + s.cleanup.Lock() + defer s.cleanup.Unlock() + delete(s.conns, id) +} + +// remember a TCP connection +func (s *Statsd) remember(id string, conn *net.TCPConn) { + s.cleanup.Lock() + defer s.cleanup.Unlock() + s.conns[id] = conn +} + func (s *Statsd) Stop() { s.Lock() defer s.Unlock() log.Println("I! Stopping the statsd service") close(s.done) - s.listener.Close() + switch s.Protocol { + case "udp": + s.UDPlistener.Close() + case "tcp": + s.TCPlistener.Close() + // Close all open TCP connections + // - get all conns from the s.conns map and put into slice + // - this is so the forget() function doesnt conflict with looping + // over the s.conns map + var conns []*net.TCPConn + s.cleanup.Lock() + for _, conn := range s.conns { + conns = append(conns, conn) + } + s.cleanup.Unlock() + for _, conn := range conns { + conn.Close() + } + default: + s.UDPlistener.Close() + } s.wg.Wait() close(s.in) + log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress) } func init() { inputs.Add("statsd", func() telegraf.Input { return &Statsd{ + Protocol: defaultProtocol, ServiceAddress: ":8125", + MaxTCPConnections: 250, MetricSeparator: "_", AllowedPendingMessages: defaultAllowPendingMessage, DeleteCounters: true, diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 486c6cece15bc..c8164d4f76246 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -3,11 +3,32 @@ package statsd import ( "errors" "fmt" + "net" "testing" + "time" "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +const ( + testMsg = "test.tcp.msg:100|c" +) + +func newTestTcpListener() (*Statsd, chan []byte) { + in := make(chan []byte, 1500) + listener := &Statsd{ + Protocol: "tcp", + ServiceAddress: ":8125", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + in: in, + done: make(chan struct{}), + } + return listener, in +} + func NewTestStatsd() *Statsd { s := Statsd{} @@ -24,6 +45,116 @@ func NewTestStatsd() *Statsd { return &s } +// Test that MaxTCPConections is respected +func TestConcurrentConns(t *testing.T) { + listener := Statsd{ + Protocol: "tcp", + ServiceAddress: ":8125", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected when max==1 +func TestConcurrentConns1(t *testing.T) { + listener := Statsd{ + Protocol: "tcp", + ServiceAddress: ":8125", + AllowedPendingMessages: 10000, + MaxTCPConnections: 1, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + + // Connection over the limit: + conn, err := net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + _, err = conn.Write([]byte(testMsg)) + assert.NoError(t, err) + time.Sleep(time.Millisecond * 10) + assert.Zero(t, acc.NFields()) +} + +// Test that MaxTCPConections is respected +func TestCloseConcurrentConns(t *testing.T) { + listener := Statsd{ + Protocol: "tcp", + ServiceAddress: ":8125", + AllowedPendingMessages: 10000, + MaxTCPConnections: 2, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Start(acc)) + + time.Sleep(time.Millisecond * 25) + _, err := net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + _, err = net.Dial("tcp", "127.0.0.1:8125") + assert.NoError(t, err) + + listener.Stop() +} + +// benchmark how long it takes to accept & process 100,000 metrics: +func BenchmarkTCP(b *testing.B) { + listener := Statsd{ + Protocol: "tcp", + ServiceAddress: ":8125", + AllowedPendingMessages: 250000, + MaxTCPConnections: 250, + } + acc := &testutil.Accumulator{Discard: true} + + // send multiple messages to socket + for n := 0; n < b.N; n++ { + err := listener.Start(acc) + if err != nil { + panic(err) + } + + time.Sleep(time.Millisecond * 25) + conn, err := net.Dial("tcp", "127.0.0.1:8125") + if err != nil { + panic(err) + } + for i := 0; i < 250000; i++ { + fmt.Fprintf(conn, testMsg) + } + // wait for 250,000 metrics to get added to accumulator + time.Sleep(time.Millisecond) + listener.Stop() + } +} + // Valid lines should be parsed and their values should be cached func TestParse_ValidLines(t *testing.T) { s := NewTestStatsd()