Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for http status code as a field #1574

Merged
merged 2 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- [#1485](https://github.com/influxdata/kapacitor/issues/1485): Add bools field types to UDFs.
- [#1545](https://github.com/influxdata/kapacitor/pull/1545): Add support for timeout, tags and service template in the Alerta AlertNode
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost

### Bugfixes

Expand Down
7 changes: 4 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a

for _, p := range n.HTTPPostHandlers {
c := httppost.HandlerConfig{
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
URL: p.URL,
Endpoint: p.Endpoint,
Headers: p.Headers,
CaptureResponse: p.CaptureResponseFlag,
}
h := et.tm.HTTPPostService.Handler(c, ctx...)
an.handlers = append(an.handlers, h)
Expand Down
71 changes: 60 additions & 11 deletions http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ package kapacitor
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/pkg/errors"
)

type HTTPPostNode struct {
Expand Down Expand Up @@ -91,13 +95,35 @@ func (g *httpPostGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error)

func (g *httpPostGroup) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error) {
row := batch.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to all points
batch = batch.ShallowCopy()
points := make([]edge.BatchPointMessage, len(batch.Points()))
for i, bp := range batch.Points() {
fields := bp.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
points[i] = edge.NewBatchPointMessage(
fields,
bp.Tags(),
bp.Time(),
)
}
batch.SetPoints(points)
}
return batch, nil
}

func (g *httpPostGroup) Point(p edge.PointMessage) (edge.Message, error) {
row := p.ToRow()
g.n.postRow(row)
code := g.n.doPost(row)
if g.n.c.CodeField != "" {
//Add code to point
p = p.ShallowCopy()
fields := p.Fields().Copy()
fields[g.n.c.CodeField] = int64(code)
p.SetFields(fields)
}
return p, nil
}

Expand All @@ -108,30 +134,54 @@ func (g *httpPostGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, er
return d, nil
}

func (n *HTTPPostNode) postRow(row *models.Row) {
func (n *HTTPPostNode) doPost(row *models.Row) int {
resp, err := n.postRow(row)
if err != nil {
n.diag.Error("failed to POST data", err)
return 0
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
var err error
if n.c.CaptureResponseFlag {
var body []byte
body, err = ioutil.ReadAll(resp.Body)
if err == nil {
// Use the body content as the error
err = errors.New(string(body))
}
} else {
err = errors.New("unknown error, use .captureResponse() to capture the HTTP response")
}
n.diag.Error("POST returned non 2xx status code", err, keyvalue.KV("code", strconv.Itoa(resp.StatusCode)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just now understood what you were saying here #1535 (comment) about making everything strings.

}
return resp.StatusCode
}

func (n *HTTPPostNode) postRow(row *models.Row) (*http.Response, error) {
body := n.bp.Get()
defer n.bp.Put(body)

var contentType string
if n.endpoint.RowTemplate() != nil {
mr := newMappedRow(row)
n.endpoint.RowTemplate().Execute(body, mr)
err := n.endpoint.RowTemplate().Execute(body, mr)
if err != nil {
return nil, errors.Wrap(err, "failed to execute template")
}
} else {
result := new(models.Result)
result.Series = []*models.Row{row}
err := json.NewEncoder(body).Encode(result)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}
contentType = "application/json"
}

req, err := n.endpoint.NewHTTPRequest(body)
if err != nil {
n.diag.Error("failed to marshal row data json", err)
return
return nil, errors.Wrap(err, "failed to marshal row data json")
}

if contentType != "" {
Expand All @@ -142,10 +192,9 @@ func (n *HTTPPostNode) postRow(row *models.Row) {
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
n.diag.Error("failed to POST row data", err)
return
return nil, err
}
resp.Body.Close()
return resp, nil
}

type mappedRow struct {
Expand Down
103 changes: 103 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2653,6 +2653,109 @@ stream
}
}

func TestStream_HttpPostEndpoint_StatusCodes(t *testing.T) {
headers := map[string]string{"my": "header"}
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for k, v := range headers {
nv := r.Header.Get(k)
if nv != v {
t.Fatalf("got '%s:%s', exp '%s:%s'", k, nv, k, v)
}
}
atomic.AddInt32(&requestCount, 1)
rc := atomic.LoadInt32(&requestCount)

switch rc {
case 1:
w.WriteHeader(http.StatusOK)
case 2:
w.WriteHeader(http.StatusCreated)
case 3:
w.WriteHeader(http.StatusNotFound)
case 4:
w.WriteHeader(http.StatusForbidden)
case 5:
w.WriteHeader(http.StatusInternalServerError)
case 6:
w.WriteHeader(http.StatusBadGateway)
}
}))
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|httpPost()
.endpoint('test')
.header('my', 'header')
.codeField('code')
|window()
.every(5s)
.period(5s)
|httpOut('TestStream_HttpPost')
`

er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "code", "type", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
200.0,
"idle",
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
201.0,
"idle",
92.6,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
404.0,
"idle",
95.6,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
403.0,
"idle",
93.1,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
500.0,
"idle",
92.6,
},
},
},
},
}

tmInit := func(tm *kapacitor.TaskMaster) {
c := httppost.Config{}
c.URL = ts.URL
c.Endpoint = "test"
sl, _ := httppost.NewService(httppost.Configs{c}, diagService.NewHTTPPostHandler())
tm.HTTPPostService = sl
}

testStreamerWithOutput(t, "TestStream_HttpPost", script, 13*time.Second, er, false, tmInit)

if rc := atomic.LoadInt32(&requestCount); rc != 6 {
t.Errorf("got %v exp %v", rc, 6)
}
}

func TestStream_HttpOutPassThrough(t *testing.T) {

var script = `
Expand Down
37 changes: 24 additions & 13 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,24 @@ func (a *AlertNode) Post(urls ...string) *AlertHTTPPostHandler {
return post
}

// tick:embedded:AlertNode.Post
type AlertHTTPPostHandler struct {
*AlertNode

// The POST URL.
// tick:ignore
URL string

// Name of the endpoint to be used, as is defined in the configuration file
Endpoint string

// tick:ignore
Headers map[string]string `tick:"Header"`

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`
}

// Set a header key and value on the post request.
// Setting the Authenticate header is not allowed from within TICKscript,
// please use the configuration file to specify sensitive headers.
Expand All @@ -527,19 +545,12 @@ func (a *AlertHTTPPostHandler) Header(k, v string) *AlertHTTPPostHandler {
return a
}

// tick:embedded:AlertNode.Post
type AlertHTTPPostHandler struct {
*AlertNode

// The POST URL.
// tick:ignore
URL string

// Name of the endpoint to be used, as is defined in the configuration file
Endpoint string

// tick:ignore
Headers map[string]string `tick:"Header"`
// CaptureResponse indicates that the HTTP response should be read and logged if
// the status code was not an 2xx code.
// tick:property
func (a *AlertHTTPPostHandler) CaptureResponse() *AlertHTTPPostHandler {
a.CaptureResponseFlag = true
return a
}

func (a *AlertHTTPPostHandler) validate() error {
Expand Down
17 changes: 16 additions & 1 deletion pipeline/http_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,16 @@ type HTTPPostNode struct {
// tick:ignore
Endpoints []string `tick:"Endpoint"`

// Headers
// tick:ignore
Headers map[string]string `tick:"Header"`

// CodeField is the name of the field in which to place the HTTP status code.
// If the HTTP request fails at a layer below HTTP, (i.e. rejected TCP connection), then the status code is set to 0.
CodeField string

// tick:ignore
CaptureResponseFlag bool `tick:"CaptureResponse"`

// tick:ignore
URLs []string
}
Expand Down Expand Up @@ -104,3 +111,11 @@ func (p *HTTPPostNode) Header(k, v string) *HTTPPostNode {

return p
}

// CaptureResponse indicates that the HTTP response should be read and logged if
// the status code was not an 2xx code.
// tick:property
func (p *HTTPPostNode) CaptureResponse() *HTTPPostNode {
p.CaptureResponseFlag = true
return p
}
9 changes: 7 additions & 2 deletions services/diagnostic/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,13 @@ type HTTPPostHandler struct {
l *klog.Logger
}

func (h *HTTPPostHandler) Error(msg string, err error) {
h.l.Error(msg, klog.Error(err))
func (h *HTTPPostHandler) Error(msg string, err error, ctx ...keyvalue.T) {
fields := make([]klog.Field, len(ctx)+1)
fields[0] = klog.Error(err)
for i, kv := range ctx {
fields[i+1] = klog.String(kv.Key, kv.Value)
}
h.l.Error(msg, fields...)
}

func (h *HTTPPostHandler) WithContext(ctx ...keyvalue.T) httppost.Diagnostic {
Expand Down
Loading