Skip to content

Commit

Permalink
Adding a TCP input listener
Browse files Browse the repository at this point in the history
closes #481
  • Loading branch information
sparrc committed Mar 7, 2016
1 parent 7e31279 commit 36976a5
Show file tree
Hide file tree
Showing 5 changed files with 556 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
- [#735](https://github.com/influxdata/telegraf/pull/735): SNMP Table feature. Thanks @titilambert!
- [#754](https://github.com/influxdata/telegraf/pull/754): docker plugin: adding `docker info` metrics to output. Thanks @titilambert!
- [#788](https://github.com/influxdata/telegraf/pull/788): -input-list and -output-list command-line options. Thanks @ebookbug!
- [#778](https://github.com/influxdata/telegraf/pull/778): Adding a TCP input listener.


### Bugfixes
- [#748](https://github.com/influxdata/telegraf/issues/748): Fix sensor plugin split on ":"
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/system"
_ "github.com/influxdata/telegraf/plugins/inputs/tcp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/trig"
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
Expand Down
30 changes: 30 additions & 0 deletions plugins/inputs/tcp_listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# TCP listener service input plugin

The TCP listener is a service input plugin that listens for messages on a TCP
socket and adds those messages to InfluxDB.
The plugin expects messages in the
[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md).

### Configuration:

This is a sample configuration for the plugin.

```toml
# Generic TCP listener
[[inputs.tcp_listener]]
## Address and port to host TCP listener on
service_address = ":8094"

## Number of TCP messages allowed to queue up. Once filled, the
## TCP listener will start dropping packets.
allowed_pending_messages = 10000

## Maximum number of concurrent TCP connections to allow
max_tcp_connections = 250

## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
264 changes: 264 additions & 0 deletions plugins/inputs/tcp_listener/tcp_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package tcp_listener

import (
"bufio"
"fmt"
"log"
"net"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

type TcpListener struct {
ServiceAddress string
AllowedPendingMessages int
MaxTCPConnections int `toml:"max_tcp_connections"`

sync.Mutex
// Lock for preventing a data race during resource cleanup
cleanup sync.Mutex
wg sync.WaitGroup

in chan []byte
done chan struct{}
// accept channel tracks how many active connections there are, if there
// is an available bool in accept, then we are below the maximum and can
// accept the connection
accept chan bool

// track the listener here so we can close it in Stop()
listener *net.TCPListener
// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn

parser parsers.Parser
acc telegraf.Accumulator
}

var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"

const sampleConfig = `
## Address and port to host TCP listener on
service_address = ":8094"
## Number of TCP messages allowed to queue up. Once filled, the
## TCP listener will start dropping packets.
allowed_pending_messages = 10000
## Maximum number of concurrent TCP connections to allow
max_tcp_connections = 250
## Data format to consume. This can be "json", "influx" or "graphite"
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`

func (t *TcpListener) SampleConfig() string {
return sampleConfig
}

func (t *TcpListener) Description() string {
return "Generic TCP listener"
}

// All the work is done in the Start() function, so this is just a dummy
// function.
func (t *TcpListener) Gather(_ telegraf.Accumulator) error {
return nil
}

func (t *TcpListener) SetParser(parser parsers.Parser) {
t.parser = parser
}

// Start starts the tcp listener service.
func (t *TcpListener) Start(acc telegraf.Accumulator) error {
t.Lock()
defer t.Unlock()

t.acc = acc
t.in = make(chan []byte, t.AllowedPendingMessages)
t.done = make(chan struct{})
t.accept = make(chan bool, t.MaxTCPConnections)
t.conns = make(map[string]*net.TCPConn)
for i := 0; i < t.MaxTCPConnections; i++ {
t.accept <- true
}

// Start listener
var err error
address, _ := net.ResolveTCPAddr("tcp", t.ServiceAddress)
t.listener, err = net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
return err
}
log.Println("TCP server listening on: ", t.listener.Addr().String())

t.wg.Add(2)
go t.tcpListen()
go t.tcpParser()

log.Printf("Started TCP listener service on %s\n", t.ServiceAddress)
return nil
}

// Stop cleans up all resources
func (t *TcpListener) Stop() {
t.Lock()
defer t.Unlock()
close(t.done)
t.listener.Close()

// Close all open TCP connections
// - get all conns from the t.conns map and put into slice
// - this is so the forget() function doesnt conflict with looping
// over the t.conns map
var conns []*net.TCPConn
t.cleanup.Lock()
for _, conn := range t.conns {
conns = append(conns, conn)
}
t.cleanup.Unlock()
for _, conn := range conns {
conn.Close()
}

t.wg.Wait()
close(t.in)
log.Println("Stopped TCP listener service on ", t.ServiceAddress)
}

// tcpListen listens for incoming TCP connections.
func (t *TcpListener) tcpListen() error {
defer t.wg.Done()

for {
select {
case <-t.done:
return nil
default:
// Accept connection:
conn, err := t.listener.AcceptTCP()
if err != nil {
return err
}

log.Printf("Received TCP Connection from %s", conn.RemoteAddr())

select {
case <-t.accept:
// not over connection limit, handle the connection properly.
t.wg.Add(1)
// generate a random id for this TCPConn
id := internal.RandomString(6)
t.remember(id, conn)
go t.handler(conn, id)
default:
// We are over the connection limit, refuse & close.
t.refuser(conn)
}
}
}
}

// refuser refuses a TCP connection
func (t *TcpListener) refuser(conn *net.TCPConn) {
// Tell the connection why we are closing.
fmt.Fprintf(conn, "Telegraf maximum concurrent TCP connections (%d)"+
" reached, closing.\nYou may want to increase max_tcp_connections in"+
" the Telegraf tcp listener configuration.\n", t.MaxTCPConnections)
conn.Close()
log.Printf("Refused TCP Connection from %s", conn.RemoteAddr())
log.Printf("WARNING: Maximum TCP Connections reached, you may want to" +
" adjust max_tcp_connections")
}

// handler handles a single TCP Connection
func (t *TcpListener) handler(conn *net.TCPConn, id string) {
// connection cleanup function
defer func() {
t.wg.Done()
conn.Close()
log.Printf("Closed TCP Connection from %s", conn.RemoteAddr())
// Add one connection potential back to channel when this one closes
t.accept <- true
t.forget(id)
}()

scanner := bufio.NewScanner(conn)
for {
select {
case <-t.done:
return
default:
if !scanner.Scan() {
return
}
buf := scanner.Bytes()
select {
case t.in <- buf:
default:
log.Printf(dropwarn, string(buf))
}
}
}
}

// tcpParser parses the incoming tcp byte packets
func (t *TcpListener) tcpParser() error {
defer t.wg.Done()
for {
select {
case <-t.done:
return nil
case packet := <-t.in:
if len(packet) == 0 {
continue
}
metrics, err := t.parser.Parse(packet)
if err == nil {
t.storeMetrics(metrics)
} else {
log.Printf("Malformed packet: [%s], Error: %s\n",
string(packet), err)
}
}
}
}

func (t *TcpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}

// forget a TCP connection
func (t *TcpListener) forget(id string) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
delete(t.conns, id)
}

// remember a TCP connection
func (t *TcpListener) remember(id string, conn *net.TCPConn) {
t.cleanup.Lock()
defer t.cleanup.Unlock()
t.conns[id] = conn
}

func init() {
inputs.Add("tcp_listener", func() telegraf.Input {
return &TcpListener{}
})
}
Loading

0 comments on commit 36976a5

Please sign in to comment.