Skip to content

Commit

Permalink
Add a 404 and high-traffic test to http listener
Browse files Browse the repository at this point in the history
also remove locking around adding metrics. Instead, keep a waitgroup on
the ServeHTTP function and wait for that to finish before returning from
the Stop() function

closes #1407
  • Loading branch information
sparrc committed Sep 6, 2016
1 parent 67c288a commit 301c79e
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 39 deletions.
12 changes: 1 addition & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [#1542](https://github.com/influxdata/telegraf/pull/1542): Add filestack webhook plugin.
- [#1599](https://github.com/influxdata/telegraf/pull/1599): Add server hostname for each docker measurements.
- [#1697](https://github.com/influxdata/telegraf/pull/1697): Add NATS output plugin.
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.

### Bugfixes

Expand Down Expand Up @@ -91,17 +92,6 @@ consistent with the behavior of `collection_jitter`.
- [#1434](https://github.com/influxdata/telegraf/pull/1434): Add measurement name arg to logparser plugin.
- [#1479](https://github.com/influxdata/telegraf/pull/1479): logparser: change resp_code from a field to a tag.
- [#1411](https://github.com/influxdata/telegraf/pull/1411): Implement support for fetching hddtemp data
- [#1407](https://github.com/influxdata/telegraf/pull/1407): HTTP service listener input plugin.

### Bugfixes

- [#1384](https://github.com/influxdata/telegraf/pull/1384): Fix datarace in apache input plugin.
- [#1399](https://github.com/influxdata/telegraf/issues/1399): Add `read_repairs` statistics to riak plugin.

## v1.0 beta 2 [2016-06-21]

### Features

- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric.
- [#1368](https://github.com/influxdata/telegraf/pull/1368): Add precision rounding to all metrics on collection.
- [#1390](https://github.com/influxdata/telegraf/pull/1390): Add support for Tengine
Expand Down
36 changes: 17 additions & 19 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type HttpListener struct {
WriteTimeout internal.Duration

sync.Mutex
wg sync.WaitGroup

listener *stoppableListener.StoppableListener

Expand Down Expand Up @@ -84,12 +85,13 @@ func (t *HttpListener) Stop() {
t.listener.Stop()
t.listener.Close()

t.wg.Wait()

log.Println("Stopped HTTP listener service on ", t.ServiceAddress)
}

// httpListen listens for HTTP requests.
func (t *HttpListener) httpListen() error {

if t.ReadTimeout.Duration < time.Second {
t.ReadTimeout.Duration = time.Second * 10
}
Expand All @@ -107,48 +109,44 @@ func (t *HttpListener) httpListen() error {
}

func (t *HttpListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
t.wg.Add(1)
defer t.wg.Done()
body, err := ioutil.ReadAll(req.Body)

if err != nil {
log.Printf("Problem reading request: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR reading request", http.StatusInternalServerError)
return
}

var path = req.URL.Path[1:]

if path == "write" {
switch req.URL.Path {
case "/write":
var metrics []telegraf.Metric
metrics, err = t.parser.Parse(body)
if err == nil {
t.storeMetrics(metrics)
for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
res.WriteHeader(http.StatusNoContent)
} else {
log.Printf("Problem parsing body: [%s], Error: %s\n", string(body), err)
http.Error(res, "ERROR parsing metrics", http.StatusInternalServerError)
}
} else if path == "query" {
// Deliver a dummy response to the query endpoint, as some InfluxDB clients test endpoint availability with a query
case "/query":
// Deliver a dummy response to the query endpoint, as some InfluxDB
// clients test endpoint availability with a query
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Version", "1.0")
res.WriteHeader(http.StatusOK)
res.Write([]byte("{\"results\":[]}"))
} else {
case "/ping":
// respond to ping requests
res.WriteHeader(http.StatusNoContent)
default:
// Don't know how to respond to calls to other endpoints
http.NotFound(res, req)
}
}

func (t *HttpListener) storeMetrics(metrics []telegraf.Metric) error {
t.Lock()
defer t.Unlock()

for _, m := range metrics {
t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
return nil
}

func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HttpListener{}
Expand Down
71 changes: 62 additions & 9 deletions plugins/inputs/http_listener/http_listener_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package http_listener

import (
"sync"
"testing"
"time"

"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"

Expand All @@ -30,15 +30,14 @@ cpu_load_short,host=server06 value=12.0 1422568543702900257
func newTestHttpListener() *HttpListener {
listener := &HttpListener{
ServiceAddress: ":8186",
ReadTimeout: internal.Duration{Duration: time.Second * 10},
WriteTimeout: internal.Duration{Duration: time.Second * 10},
}
return listener
}

func TestWriteHTTP(t *testing.T) {
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
Expand All @@ -47,7 +46,7 @@ func TestWriteHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 25)

// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)

Expand All @@ -73,6 +72,55 @@ func TestWriteHTTP(t *testing.T) {
}
}

// writes 25,000 metrics to the listener with 10 different writers
func TestWriteHTTPHighTraffic(t *testing.T) {
listener := &HttpListener{ServiceAddress: ":8286"}
parser, _ := parsers.NewInfluxParser()
listener.SetParser(parser)

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()

time.Sleep(time.Millisecond * 25)

// post many messages to listener
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 500; i++ {
resp, err := http.Post("http://localhost:8286/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgs)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}
wg.Done()
}()
}

wg.Wait()
time.Sleep(time.Millisecond * 50)
listener.Gather(acc)

require.Equal(t, int64(25000), int64(acc.NMetrics()))
}

func TestReceive404ForInvalidEndpoint(t *testing.T) {
listener := newTestHttpListener()
listener.parser, _ = parsers.NewInfluxParser()

acc := &testutil.Accumulator{}
require.NoError(t, listener.Start(acc))
defer listener.Stop()

time.Sleep(time.Millisecond * 25)

// post single message to listener
resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
require.NoError(t, err)
require.EqualValues(t, 404, resp.StatusCode)
}

func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 250)

Expand All @@ -86,7 +134,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
time.Sleep(time.Millisecond * 25)

// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
require.NoError(t, err)
require.EqualValues(t, 500, resp.StatusCode)
}
Expand All @@ -104,12 +152,12 @@ func TestWriteHTTPEmpty(t *testing.T) {
time.Sleep(time.Millisecond * 25)

// post single message to listener
var resp, err = http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}

func TestQueryHTTP(t *testing.T) {
func TestQueryAndPingHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 250)

listener := newTestHttpListener()
Expand All @@ -122,7 +170,12 @@ func TestQueryHTTP(t *testing.T) {
time.Sleep(time.Millisecond * 25)

// post query to listener
var resp, err = http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
require.NoError(t, err)
require.EqualValues(t, 200, resp.StatusCode)

// post ping to listener
resp, err = http.Post("http://localhost:8186/ping", "", nil)
require.NoError(t, err)
require.EqualValues(t, 204, resp.StatusCode)
}

0 comments on commit 301c79e

Please sign in to comment.