Skip to content

Commit

Permalink
Add a generic HTTPPost node
Browse files Browse the repository at this point in the history
An HTTPPost node to mimic the behavior of HTTPOut, with the difference
that data is POST'd to an HTTP endpoint rather than made available at a
kapacitor endpoint.

Closes influxdata#1330
  • Loading branch information
sputnik13 committed Apr 20, 2017
1 parent 39c759d commit e7dd714
Show file tree
Hide file tree
Showing 9 changed files with 695 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ kapacitor define-handler system aggregate_by_1m.yaml
### Features

- [#1322](https://github.com/influxdata/kapacitor/pull/1322): TLS configuration in Slack service for Mattermost compatibility
- [#1330](https://github.com/influxdata/kapacitor/issues/1330): Generic HTTP Post node
- [#1159](https://github.com/influxdata/kapacitor/pulls/1159): Go version 1.7.4 -> 1.7.5
- [#1175](https://github.com/influxdata/kapacitor/pull/1175): BREAKING: Add generic error counters to every node type.
Renamed `query_errors` to `errors` in batch node.
Expand Down
84 changes: 84 additions & 0 deletions http_post.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package kapacitor

import (
"encoding/json"
"log"
"net/http"
"sync"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

type HTTPPostNode struct {
node
c *pipeline.HTTPPostNode
url string
mu sync.RWMutex
bp *bufpool.Pool
}

// Create a new HTTPPostNode which caches the most recent item and exposes it over the HTTP API.
func newHTTPPostNode(et *ExecutingTask, n *pipeline.HTTPPostNode, l *log.Logger) (*HTTPPostNode, error) {
hn := &HTTPPostNode{
node: node{Node: n, et: et, logger: l},
c: n,
bp: bufpool.New(),
url: n.Url,
}
hn.node.runF = hn.runPost
return hn, nil
}

func (h *HTTPPostNode) runPost([]byte) error {
switch h.Wants() {
case pipeline.StreamEdge:
for p, ok := h.ins[0].NextPoint(); ok; p, ok = h.ins[0].NextPoint() {
h.timer.Start()
row := models.PointToRow(p)
h.postRow(p.Group, row)
h.timer.Stop()
for _, child := range h.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
for b, ok := h.ins[0].NextBatch(); ok; b, ok = h.ins[0].NextBatch() {
h.timer.Start()
row := models.BatchToRow(b)
h.postRow(b.Group, row)
h.timer.Stop()
for _, child := range h.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
}
}
return nil
}

// Update the result structure with a row.
func (h *HTTPPostNode) postRow(group models.GroupID, row *models.Row) {
result := new(models.Result)
result.Series = []*models.Row{row}

body := h.bp.Get()
defer h.bp.Put(body)
err := json.NewEncoder(body).Encode(result)
if err != nil {
h.logger.Printf("E! failed to marshal row data json: %v", err)
return
}

resp, err := http.Post(h.url, "application/json", body)
if err != nil {
h.logger.Printf("E! failed to POST row data: %v", err)
}
resp.Body.Close()
}
Loading

0 comments on commit e7dd714

Please sign in to comment.