Skip to content

Commit

Permalink
Add support for endpoints and verify multi-key behavior
Browse files Browse the repository at this point in the history
Support HTTPS

Update tests for new format.
  • Loading branch information
Jim Regovich committed Jun 1, 2018
1 parent c3f55ad commit 614254d
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 62 deletions.
27 changes: 24 additions & 3 deletions pipeline/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,31 @@ import (
//
// Add a field `cpu_threshold` and a tag `foo` to each point based on the value loaded from the hierarchical source.
// The list of templates in the `.order()` property are evaluated using the points tags.
// .source may be one of: a file URI, a URL, or an endpoint name as configured in the kapacitor configuration file
// as an httppost object.
//
// If source is defiend as a plain string and not a URI (file://,http://), the source wil intepreseted as an endpoint
// defined in an [[httpost]] section in the kapacitor configuration.
//
// A source defined as an HTTP URL or [[httppost]] endpoint will be loaded as an HTTP GET and loaded only once when a
// task is enabled, and then on subsequent calls to the /sideload/reload endpoint.
// An HTTP source ednpoint should return a JSON document where each property is a key name specified in the order statement
// and a its value is an object with a set of key/value pairs.
// HTTP Source example:
//{
// "host1" : {
// "cpu_threhsold":98,
// "some_tag": "value",
// "disable": "False"
// },
// "some_tag_value": {
// "cpu_threshold": 97
// "another_tag": "value"
// }
// }
//
// The files paths are checked then checked in order for the specified keys and the first value that is found is used.
// HTTP endpoints are checked in the same manner.
type SideloadNode struct {
chainnode

Expand All @@ -35,9 +59,6 @@ type SideloadNode struct {
// Tags is a list of tags to load.
// tick:ignore
Tags map[string]string `tick:"Tag" json:"tags"`

HttpUser string `json:"httpuser"`
HttpPassword string `json:"httppassword"`
}

func newSideloadNode(wants EdgeType) *SideloadNode {
Expand Down
18 changes: 5 additions & 13 deletions pipeline/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (

func TestSideloadNode_MarshalJSON(t *testing.T) {
type fields struct {
Source string
Order []string
Fields map[string]interface{}
Tags map[string]string
HttpUser string
HttpPassword string
Source string
Order []string
Fields map[string]interface{}
Tags map[string]string
}
tests := []struct {
name string
Expand All @@ -32,8 +30,6 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
"t1": "k1",
"t2": "",
},
HttpUser: "",
HttpPassword: "",
},
want: `{
"typeOf": "sideload",
Expand All @@ -51,9 +47,7 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
"tags": {
"t1": "k1",
"t2": ""
},
"httpuser": "",
"httppassword": ""
}
}`,
},
}
Expand All @@ -65,8 +59,6 @@ func TestSideloadNode_MarshalJSON(t *testing.T) {
w.OrderList = tt.fields.Order
w.Fields = tt.fields.Fields
w.Tags = tt.fields.Tags
w.HttpUser = tt.fields.HttpUser
w.HttpPassword = tt.fields.HttpPassword
MarshalIndentTestHelper(t, w, tt.wantErr, tt.want)
})
}
Expand Down
3 changes: 1 addition & 2 deletions pipeline/tick/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) {
n.Pipe("sideload")

n.Dot("source", d.Source)
n.Dot("httpUser", d.HttpUser)
n.Dot("httpPassword", d.HttpPassword)

order := make([]interface{}, len(d.OrderList))
for i := range d.OrderList {
order[i] = d.OrderList[i]
Expand Down
4 changes: 0 additions & 4 deletions pipeline/tick/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ func TestSideload(t *testing.T) {
def.Field("finance", "loan")
def.Tag("vocabulary", "volcano")
def.Tag("make", "toyota")
def.HttpUser = ("user")
def.HttpPassword = ("password")

want := `stream
|from()
|sideload()
.source('file:///tmpdir')
.httpUser('user')
.httpPassword('password')
.order('a', 'b', 'c')
.field('finance', 'loan')
.field('judgement', 'plantiff')
Expand Down
20 changes: 10 additions & 10 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ type Diagnostic interface {
// Only one of name and url should be non-empty
type Endpoint struct {
mu sync.RWMutex
url string
Url string
headers map[string]string
auth BasicAuth
Auth BasicAuth
alertTemplate *template.Template
rowTemplate *template.Template
closed bool
}

func NewEndpoint(url string, headers map[string]string, auth BasicAuth, at, rt *template.Template) *Endpoint {
return &Endpoint{
url: url,
Url: url,
headers: headers,
auth: auth,
Auth: auth,
alertTemplate: at,
rowTemplate: rt,
}
Expand All @@ -54,9 +54,9 @@ func (e *Endpoint) Close() {
func (e *Endpoint) Update(c Config) error {
e.mu.Lock()
defer e.mu.Unlock()
e.url = c.URL
e.Url = c.URL
e.headers = c.Headers
e.auth = c.BasicAuth
e.Auth = c.BasicAuth
at, err := c.getAlertTemplate()
if err != nil {
return err
Expand Down Expand Up @@ -89,13 +89,13 @@ func (e *Endpoint) NewHTTPRequest(body io.Reader) (req *http.Request, err error)
return nil, errors.New("endpoint was closed")
}

req, err = http.NewRequest("POST", e.url, body)
req, err = http.NewRequest("POST", e.Url, body)
if err != nil {
return nil, fmt.Errorf("failed to create POST request: %v", err)
}

if e.auth.valid() {
req.SetBasicAuth(e.auth.Username, e.auth.Password)
if e.Auth.valid() {
req.SetBasicAuth(e.Auth.Username, e.Auth.Password)
}

for k, v := range e.headers {
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *Service) Test(options interface{}) error {
// Create the HTTP request
var req *http.Request
e := &Endpoint{
url: o.URL,
Url: o.URL,
headers: o.Headers,
}
req, err = e.NewHTTPRequest(body)
Expand Down
38 changes: 20 additions & 18 deletions services/sideload/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ghodss/yaml"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -87,29 +88,29 @@ func (s *Service) Reload() error {
return nil
}

func (s *Service) Source(srcURL string) (Source, error) {
func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) {
var src Source

u, err := url.Parse(srcURL)
u, err := url.Parse(endpoint.Url)
if err != nil {
return nil, err
}
if u.Scheme != "file" && u.Scheme != "http" {
if u.Scheme != "file" && u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("unsupported source scheme %q, must be 'file' or 'http'", u.Scheme)
}

if u.Scheme == "file" {
src, err = s.SourceFile(u.Path)
} else if u.Scheme == "http" {
src, err = s.SourceHttp(srcURL)
} else if u.Scheme == "http" || u.Scheme == "https" {
src, err = s.SourceHttp(endpoint, u.Scheme)
}

return src, err
}

func (s *Service) SourceHttp(srcURL string) (Source, error) {
func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source, error) {
var err error
dir := srcURL
dir := endpoint.Url
s.mu.Lock()
defer s.mu.Unlock()
/*
Expand All @@ -122,7 +123,8 @@ func (s *Service) SourceHttp(srcURL string) (Source, error) {
src = &source{
s: s,
dir: dir,
scheme: "http",
scheme: scheme,
e: endpoint,
}
err = src.updateCache()
if err != nil {
Expand All @@ -133,7 +135,7 @@ func (s *Service) SourceHttp(srcURL string) (Source, error) {
src.referenceCount++

if err != nil {
return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", srcURL, err.Error())
return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error())
}
return src, nil
}
Expand Down Expand Up @@ -178,12 +180,11 @@ type Source interface {
}

type source struct {
s *Service
scheme string
dir string
mu sync.RWMutex
httpUser string
httpPassword string
s *Service
scheme string
dir string
mu sync.RWMutex
e *httppost.Endpoint

cache map[string]map[string]interface{}
referenceCount int
Expand Down Expand Up @@ -222,9 +223,10 @@ func (s *source) updateCacheFile() error {
}
func (s *source) updateCacheHttp() error {
req, err := http.NewRequest("GET", s.dir, nil)
if s.httpUser != "" {
req.SetBasicAuth(s.httpUser, s.httpPassword)
if s.e.Auth.Username != "" && s.e.Auth.Password != "" {
req.SetBasicAuth(s.e.Auth.Username, s.e.Auth.Password)
}

client := &http.Client{
Timeout: time.Second * 10,
}
Expand All @@ -251,7 +253,7 @@ func (s *source) updateCache() error {

if s.scheme == "file" {
return s.updateCacheFile()
} else if s.scheme == "http" {
} else if s.scheme == "http" || s.scheme == "https" {
return s.updateCacheHttp()
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion services/sideload/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/sideload"
)

Expand All @@ -22,7 +23,10 @@ func TestService_Source_Lookup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
src, err := s.Source(fmt.Sprintf("file://%s/testdata/src0", wd))
e := &httppost.Endpoint{
Url: fmt.Sprintf("file://%s/testdata/src0", wd),
}
src, err := s.Source(e)
if err != nil {
t.Fatal(err)
}
Expand Down
38 changes: 28 additions & 10 deletions sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,63 @@ package kapacitor

import (
"fmt"
"strconv"
"text/template"
text "text/template"

"github.com/influxdata/kapacitor/bufpool"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/sideload"
"github.com/pkg/errors"
"log"
"net/url"
"strconv"
"text/template"
text "text/template"
)

type SideloadNode struct {
node
s *pipeline.SideloadNode
source sideload.Source
orderTmpls []orderTmpl

order []string
httpUser string
httpPassword string
bufferPool *bufpool.Pool
Endpoint *httppost.Endpoint
order []string
bufferPool *bufpool.Pool
}

// Create a new SideloadNode which loads fields and tags from external sources.
func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnostic) (*SideloadNode, error) {
var e *httppost.Endpoint
var ok bool
sn := &SideloadNode{
node: node{Node: n, et: et, diag: d},
s: n,
bufferPool: bufpool.New(),
order: make([]string, len(n.OrderList)),
orderTmpls: make([]orderTmpl, len(n.OrderList)),
}
src, err := et.tm.SideloadService.Source(n.Source)
u, err := url.Parse(n.Source)
if err != nil {
log.Fatal(err)
}
if u.Scheme == "" {
e, ok = et.tm.HTTPPostService.Endpoint(n.Source)
if !ok {
log.Fatal("Specified endpoint does not exist: " + n.Source)
}
} else {
e = &httppost.Endpoint{
Url: n.Source,
}
}
sn.Endpoint = e
src, err := et.tm.SideloadService.Source(e)
if err != nil {
return nil, err
}
sn.source = src

for i, o := range n.OrderList {
op, err := newOrderTmpl(o, sn.bufferPool)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type TaskMaster struct {
}

SideloadService interface {
Source(dir string) (sideload.Source, error)
Source(*httppost.Endpoint) (sideload.Source, error)
}

Commander command.Commander
Expand Down

0 comments on commit 614254d

Please sign in to comment.