Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TCP listener for statsd input #2293

Merged
merged 29 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6623b4b
test for disable tag sorting
szibis Jan 2, 2017
728ba11
Add tcp listener to statsd input
szibis Jan 12, 2017
d0ab7cd
Readme and example config updates
szibis Jan 13, 2017
4282b4a
Adding default values for new arguments in doc
szibis Jan 13, 2017
589e4db
Sefl stats with tcp information in name
szibis Jan 17, 2017
d8b20e8
Add TCP listener tests for statsd input
szibis Jan 18, 2017
4be8bf0
Revert "test for disable tag sorting"
szibis Jan 18, 2017
123428d
Merge remote-tracking branch 'upstream/master'
szibis Mar 13, 2017
a1f11b7
Add TCP listenedr for statsd PR to Changelog
szibis Mar 13, 2017
07cdccb
Merge remote-tracking branch 'upstream/master'
szibis Apr 13, 2017
a104281
Merge remote-tracking branch 'upstream/master'
szibis May 17, 2017
12614ca
Move changelog line from 1.3 to 1.4 info
szibis May 17, 2017
b0b459e
test for disable tag sorting
szibis Jan 2, 2017
f41b247
Add tcp listener to statsd input
szibis Jan 12, 2017
a8121e6
Readme and example config updates
szibis Jan 13, 2017
4c13754
Adding default values for new arguments in doc
szibis Jan 13, 2017
c2e0aa0
Sefl stats with tcp information in name
szibis Jan 17, 2017
aba92e3
Add TCP listener tests for statsd input
szibis Jan 18, 2017
228cf6a
Revert "test for disable tag sorting"
szibis Jan 18, 2017
dc1d190
Add TCP listenedr for statsd PR to Changelog
szibis Mar 13, 2017
60ad86d
Move changelog line from 1.3 to 1.4 info
szibis May 17, 2017
69f21fc
Merge branch 'master' of github.com:szibis/telegraf
szibis May 18, 2017
e595a04
Merge remote-tracking branch 'upstream/master'
szibis Aug 4, 2017
5dc888f
Changelog update
szibis Aug 4, 2017
7eb4a0f
Just closing connection on refuse with log info
szibis Aug 4, 2017
7285190
Fix test by removing info in return on refuse
szibis Aug 4, 2017
d7095f9
Fix test by removing info in return on refuse - remove unused variables
szibis Aug 4, 2017
e7b4c32
Merge remote-tracking branch 'upstream/master'
szibis Aug 8, 2017
036fe5d
remove duplicate line in changelog
szibis Aug 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

- [#2487](https://github.com/influxdata/telegraf/pull/2487): Add Kafka 0.9+ consumer support
- [#2773](https://github.com/influxdata/telegraf/pull/2773): Add support for self-signed certs to InfluxDB input plugin
- [#2293](https://github.com/influxdata/telegraf/pull/2293): Add TCP listener for statsd input
- [#2581](https://github.com/influxdata/telegraf/pull/2581): Add Docker container environment variables as tags. Only whitelisted
- [#2817](https://github.com/influxdata/telegraf/pull/2817): Add timeout option to IPMI sensor plugin
- [#2883](https://github.com/influxdata/telegraf/pull/2883): Add support for an optional SSL/TLS configuration to nginx input plugin
Expand Down
4 changes: 4 additions & 0 deletions etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,10 @@

# # Statsd Server
# [[inputs.statsd]]
# ## Protocol, must be "tcp" or "udp"
# protocol = "udp"
# ## Maximum number of concurrent TCP connections to allow
# max_tcp_connections = 250
# ## Address and port to host UDP listener on
# service_address = ":8125"
#
Expand Down
9 changes: 9 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
```toml
# Statsd Server
[[inputs.statsd]]
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"

## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250

## Address and port to host UDP listener on
service_address = ":8125"

Expand Down Expand Up @@ -146,6 +152,9 @@ metric type:

### Plugin arguments

- **protocol** string: Protocol used in listener - tcp or udp options
- **max_tcp_connections** []int: Maximum number of concurrent TCP connections
to allow. Used when protocol is set to tcp.
- **service_address** string: Address to listen for statsd UDP packets on
- **delete_gauges** boolean: Delete gauges on every collection interval
- **delete_counters** boolean: Delete counters on every collection interval
Expand Down
213 changes: 205 additions & 8 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package statsd

import (
"bufio"
"errors"
"fmt"
"log"
Expand All @@ -14,7 +15,9 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)

const (
Expand All @@ -24,15 +27,24 @@ const (

defaultFieldName = "value"

defaultProtocol = "udp"

defaultSeparator = "_"
defaultAllowPendingMessage = 10000
MaxTCPConnections = 250
)

var dropwarn = "E! Error: statsd message queue full. " +
"We have dropped %d messages so far. " +
"You may want to increase allowed_pending_messages in the config\n"

var malformedwarn = "E! Statsd over TCP has received %d malformed packets" +
" thus far."

type Statsd struct {
// Protocol used on listener - udp or tcp
Protocol string `toml:"protocol"`

// Address & Port to serve from
ServiceAddress string

Expand Down Expand Up @@ -64,9 +76,17 @@ type Statsd struct {
UDPPacketSize int `toml:"udp_packet_size"`

sync.Mutex
wg sync.WaitGroup
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool
// drops tracks the number of dropped metrics.
drops int
// malformed tracks the number of malformed packets
malformed int

// Channel for all incoming statsd packets
in chan []byte
Expand All @@ -83,9 +103,24 @@ type Statsd struct {
// bucket -> influx templates
Templates []string

listener *net.UDPConn
// Protocol listeners
UDPlistener *net.UDPConn
TCPlistener *net.TCPListener

// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn

MaxTCPConnections int `toml:"max_tcp_connections"`

graphiteParser *graphite.GraphiteParser

acc telegraf.Accumulator

MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
PacketsRecv selfstat.Stat
BytesRecv selfstat.Stat
}

// One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
Expand Down Expand Up @@ -128,10 +163,16 @@ type cachedtimings struct {
}

func (_ *Statsd) Description() string {
return "Statsd Server"
return "Statsd UDP/TCP Server"
}

const sampleConfig = `
## Protocol, must be "tcp" or "udp" (default=udp)
protocol = "udp"

## MaxTCPConnection - applicable when protocol is set to tcp (default=250)
max_tcp_connections = 250

## Address and port to host UDP listener on
service_address = ":8125"

Expand Down Expand Up @@ -247,6 +288,27 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)

s.Lock()
defer s.Unlock()
//
tags := map[string]string{
"address": s.ServiceAddress,
}
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)

s.in = make(chan []byte, s.AllowedPendingMessages)
s.done = make(chan struct{})
s.accept = make(chan bool, s.MaxTCPConnections)
s.conns = make(map[string]*net.TCPConn)
for i := 0; i < s.MaxTCPConnections; i++ {
s.accept <- true
}

if s.ConvertNames {
log.Printf("I! WARNING statsd: convert_names config option is deprecated," +
" please use metric_separator instead")
Expand All @@ -258,31 +320,75 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {

s.wg.Add(2)
// Start the UDP listener
go s.udpListen()
switch s.Protocol {
case "udp":
go s.udpListen()
case "tcp":
go s.tcpListen()
}
// Start the line parser
go s.parser()
log.Printf("I! Started the statsd service on %s\n", s.ServiceAddress)
return nil
}

// tcpListen() starts listening for udp packets on the configured port.
func (s *Statsd) tcpListen() error {
defer s.wg.Done()
// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", s.ServiceAddress)
s.TCPlistener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenTCP - %s", err)
return err
}
log.Println("I! TCP Statsd listening on: ", s.TCPlistener.Addr().String())
for {
select {
case <-s.done:
return nil
default:
// Accept connection:
conn, err := s.TCPlistener.AcceptTCP()
if err != nil {
return err
}

select {
case <-s.accept:
// not over connection limit, handle the connection properly.
s.wg.Add(1)
// generate a random id for this TCPConn
id := internal.RandomString(6)
s.remember(id, conn)
go s.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
s.refuser(conn)
}
}
}
}

// udpListen starts listening for udp packets on the configured port.
func (s *Statsd) udpListen() error {
defer s.wg.Done()
var err error
address, _ := net.ResolveUDPAddr("udp", s.ServiceAddress)
s.listener, err = net.ListenUDP("udp", address)
s.UDPlistener, err = net.ListenUDP("udp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}
log.Println("I! Statsd listener listening on: ", s.listener.LocalAddr().String())
log.Println("I! Statsd UDP listener listening on: ", s.UDPlistener.LocalAddr().String())

buf := make([]byte, UDP_MAX_PACKET_SIZE)
for {
select {
case <-s.done:
return nil
default:
n, _, err := s.listener.ReadFromUDP(buf)
n, _, err := s.UDPlistener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("E! Error READ: %s\n", err.Error())
continue
Expand Down Expand Up @@ -637,20 +743,111 @@ func (s *Statsd) aggregate(m metric) {
}
}

// handler handles a single TCP Connection
func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.CurrentConnections.Incr(1)
s.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
s.wg.Done()
conn.Close()
// Add one connection potential back to channel when this one closes
s.accept <- true
s.forget(id)
s.CurrentConnections.Incr(-1)
}()

var n int
scanner := bufio.NewScanner(conn)
for {
select {
case <-s.done:
return
default:
if !scanner.Scan() {
return
}
n = len(scanner.Bytes())
if n == 0 {
continue
}
s.BytesRecv.Incr(int64(n))
s.PacketsRecv.Incr(1)
bufCopy := make([]byte, n+1)
copy(bufCopy, scanner.Bytes())
bufCopy[n] = '\n'

select {
case s.in <- bufCopy:
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
log.Printf(dropwarn, s.drops)
}
}
}
}
}

// refuser refuses a TCP connection
func (s *Statsd) refuser(conn *net.TCPConn) {
conn.Close()
log.Printf("I! Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("I! WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}

// forget a TCP connection
func (s *Statsd) forget(id string) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
delete(s.conns, id)
}

// remember a TCP connection
func (s *Statsd) remember(id string, conn *net.TCPConn) {
s.cleanup.Lock()
defer s.cleanup.Unlock()
s.conns[id] = conn
}

func (s *Statsd) Stop() {
s.Lock()
defer s.Unlock()
log.Println("I! Stopping the statsd service")
close(s.done)
s.listener.Close()
switch s.Protocol {
case "udp":
s.UDPlistener.Close()
case "tcp":
s.TCPlistener.Close()
// Close all open TCP connections
// - get all conns from the s.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the s.conns map
var conns []*net.TCPConn
s.cleanup.Lock()
for _, conn := range s.conns {
conns = append(conns, conn)
}
s.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}
default:
s.UDPlistener.Close()
}
s.wg.Wait()
close(s.in)
log.Println("I! Stopped Statsd listener service on ", s.ServiceAddress)
}

func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
Protocol: defaultProtocol,
ServiceAddress: ":8125",
MaxTCPConnections: 250,
MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage,
DeleteCounters: true,
Expand Down
Loading