Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sideload http #1894

Merged
merged 11 commits into from
Dec 17, 2020
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

### Features
- [#1839](https://github.com/influxdata/kapacitor/pull/1839): Add Subscription path configuration option to allow Kapacitor to run behind a reverse proxy, thanks @aspring
- [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration, thanks @jregovic!
- [#2055](https://github.com/influxdata/kapacitor/pull/2055): Add support for correlate in the Alerta AlertNode, thanks @nermolaev!
- [#2409](https://github.com/influxdata/kapacitor/pull/2409): Optionally use kapacitor alert details as opsgenie description text, thanks @JamesClonk!
- [#2441](https://github.com/influxdata/kapacitor/pull/2441): Preallocate GroupIDs for increased performance by reducing allocations.
Expand Down Expand Up @@ -103,7 +104,6 @@
## v1.5.0 [2018-05-17]

### Features

- [#1842](https://github.com/influxdata/kapacitor/pull/1842): Add alert inhibitors that allow an alert to suppress events from other matching alerts.
- [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration.
- [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value
Expand Down
24 changes: 24 additions & 0 deletions pipeline/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,31 @@ import (
//
// Add a field `cpu_threshold` and a tag `foo` to each point based on the value loaded from the hierarchical source.
// The list of templates in the `.order()` property are evaluated using the points tags.
// .source may be one of: a file URI, a URL, or an endpoint name as configured in the Kapacitor configuration file
// as an httppost object.
//
// If source is defined as a plain string and not a URI (file://,http://), the source wil interpreted as an endpoint
// defined in an [[httpost]] section in the Kapacitor configuration.
//
// A source defined as an HTTP URL or [[httppost]] endpoint will be loaded as an HTTP GET and loaded only once when a
// task is enabled, and then on subsequent calls to the /sideload/reload endpoint.
// An HTTP source endpoint should return a JSON document where each property is a key name specified in the order statement
// and a its value is an object with a set of key/value pairs.
// HTTP Source example:
//{
// "host1" : {
// "cpu_threshold":98,
// "some_tag": "value",
// "disable": "False"
// },
// "some_tag_value": {
// "cpu_threshold": 97
// "another_tag": "value"
// }
// }
//
// The files paths are checked then checked in order for the specified keys and the first value that is found is used.
// HTTP endpoints are checked in the same manner.
type SideloadNode struct {
chainnode

Expand Down
1 change: 1 addition & 0 deletions pipeline/tick/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) {
n.Pipe("sideload")

n.Dot("source", d.Source)

order := make([]interface{}, len(d.OrderList))
for i := range d.OrderList {
order[i] = d.OrderList[i]
Expand Down
10 changes: 5 additions & 5 deletions services/httppost/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Endpoint struct {
mu sync.RWMutex
urlTemplate *template.Template
headers map[string]string
auth BasicAuth
Auth BasicAuth
alertTemplate *template.Template
rowTemplate *template.Template
closed bool
Expand All @@ -43,7 +43,7 @@ func NewEndpoint(urlt *template.Template, headers map[string]string, auth BasicA
return &Endpoint{
urlTemplate: urlt,
headers: headers,
auth: auth,
Auth: auth,
alertTemplate: at,
rowTemplate: rt,
}
Expand All @@ -64,7 +64,7 @@ func (e *Endpoint) Update(c Config) error {
}
e.urlTemplate = ut
e.headers = c.Headers
e.auth = c.BasicAuth
e.Auth = c.BasicAuth
at, err := c.getAlertTemplate()
if err != nil {
return err
Expand Down Expand Up @@ -112,8 +112,8 @@ func (e *Endpoint) NewHTTPRequest(body io.Reader, tmplCtx interface{}) (req *htt
return nil, fmt.Errorf("failed to create POST request: %v", err)
}

if e.auth.valid() {
req.SetBasicAuth(e.auth.Username, e.auth.Password)
if e.Auth.valid() {
req.SetBasicAuth(e.Auth.Username, e.Auth.Password)
}

for k, v := range e.headers {
Expand Down
153 changes: 129 additions & 24 deletions services/sideload/service.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package sideload

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
"time"

"github.com/ghodss/yaml"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/pkg/errors"
)

Expand All @@ -32,7 +36,7 @@ type Service struct {
routes []httpd.Route

mu sync.Mutex
sources map[string]*source
sources map[string]Source

HTTPDService interface {
AddRoutes([]httpd.Route) error
Expand All @@ -43,7 +47,7 @@ type Service struct {
func NewService(d Diagnostic) *Service {
return &Service{
diag: d,
sources: make(map[string]*source),
sources: make(map[string]Source),
}
}

Expand Down Expand Up @@ -78,46 +82,93 @@ func (s *Service) Reload() error {
s.mu.Lock()
defer s.mu.Unlock()
for dir, src := range s.sources {
if err := src.updateCache(); err != nil {
if err := src.UpdateCache(); err != nil {
return errors.Wrapf(err, "failed to update source %q", dir)
}
}
return nil
}

func (s *Service) Source(srcURL string) (Source, error) {
u, err := url.Parse(srcURL)
func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) {
var src Source
buf := &bytes.Buffer{}
if err := endpoint.URL().Execute(buf, map[interface{}]string{}); err != nil {
return nil, err
}
u, err := url.Parse(buf.String())
if err != nil {
return nil, err
}
if u.Scheme != "file" {
return nil, fmt.Errorf("unsupported source scheme %q, must be 'file'", u.Scheme)
if u.Scheme != "file" && u.Scheme != "http" && u.Scheme != "https" {
return nil, fmt.Errorf("unsupported source scheme %q, must be 'file', 'http', or 'https'", u.Scheme)
}

if u.Scheme == "file" {
src, err = s.sourceFile(u.Path)
} else if u.Scheme == "http" || u.Scheme == "https" {
src, err = s.sourceHttp(endpoint, u.Scheme)
}
if !filepath.IsAbs(u.Path) {
return nil, fmt.Errorf("sideload source path must be absolute %q", u.Path)

return src, err
}

func (s *Service) sourceHttp(endpoint *httppost.Endpoint, scheme string) (Source, error) {
var err error
buf := &bytes.Buffer{}
if err := endpoint.URL().Execute(buf, map[interface{}]string{}); err != nil {
return nil, fmt.Errorf("Error creating request for sideload data from %s :: %s", buf.String(), err.Error())
}
dir := filepath.Clean(u.Path)

dir := buf.String()
s.mu.Lock()
defer s.mu.Unlock()
src, ok := s.sources[dir]
if !ok {
src = &httpSource{
fileSource: fileSource{
s: s,
dir: dir,
scheme: scheme,
},
e: endpoint,
}
err = src.UpdateCache()
if err != nil {
return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error())
}
s.sources[dir] = src
}
src.addToReferenceCount(1)

return src, nil
}
func (s *Service) sourceFile(path string) (Source, error) {
if !filepath.IsAbs(path) {
return nil, fmt.Errorf("sideload source path must be absolute %q", path)
}
dir := filepath.Clean(path)
s.mu.Lock()
defer s.mu.Unlock()
src, ok := s.sources[dir]
if !ok {
src = &source{
s: s,
dir: dir,
src = &fileSource{
s: s,
dir: dir,
scheme: "file",
}
err := src.updateCache()
err := src.UpdateCache()
if err != nil {
return nil, err
}

s.sources[dir] = src
}
src.referenceCount++
src.addToReferenceCount(1)

return src, nil
}

func (s *Service) removeSource(src *source) {
}
func (s *Service) removeSource(src *fileSource) {
s.mu.Lock()
defer s.mu.Unlock()
src.referenceCount--
Expand All @@ -129,23 +180,29 @@ func (s *Service) removeSource(src *source) {
type Source interface {
Lookup(order []string, key string) interface{}
Close()
UpdateCache() error
addToReferenceCount(int) int
}

type source struct {
type fileSource struct {
s *Service
dir string
mu sync.RWMutex
scheme string
cache map[string]map[string]interface{}
referenceCount int
}

func (s *source) Close() {
func (s *fileSource) addToReferenceCount(i int) int {
s.referenceCount += i
return s.referenceCount
}

func (s *fileSource) Close() {
s.s.removeSource(s)
}

func (s *source) updateCache() error {
s.mu.Lock()
defer s.mu.Unlock()
func (s *fileSource) UpdateCache() error {
s.cache = make(map[string]map[string]interface{})
err := filepath.Walk(s.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
Expand All @@ -171,10 +228,10 @@ func (s *source) updateCache() error {
s.cache[rel] = values
return nil
})
return errors.Wrapf(err, "failed to update sideload cache for source %q", s.dir)
return errors.Wrapf(err, "failed to update sideload cache for source file %q", s.dir)
}

func (s *source) Lookup(order []string, key string) (value interface{}) {
func (s *fileSource) Lookup(order []string, key string) (value interface{}) {
key = filepath.Clean(key)

s.mu.RLock()
Expand All @@ -195,6 +252,40 @@ func (s *source) Lookup(order []string, key string) (value interface{}) {
return
}

type httpSource struct {
fileSource
e *httppost.Endpoint
}

func (s *httpSource) UpdateCache() error {
s.cache = make(map[string]map[string]interface{})
req, err := http.NewRequest("GET", s.dir, nil)
if err != nil {
return errors.Wrapf(err, "failed to generate request to update sideload cache for source %q", s.dir)
}
if s.e.Auth.Username != "" && s.e.Auth.Password != "" {
req.SetBasicAuth(s.e.Auth.Username, s.e.Auth.Password)
}

client := &http.Client{
Timeout: time.Second * 10,
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

values, err := loadValues(resp.Body)
if err != nil {
return errors.Wrapf(err, "failed to load body to update sideload cache for source %q", s.dir)
}
for k, v := range values {
s.cache[k] = v
}
return nil
}

func readValues(p string) (map[string]interface{}, error) {
f, err := os.Open(p)
if err != nil {
Expand All @@ -218,5 +309,19 @@ func readValues(p string) (map[string]interface{}, error) {
return nil, errors.Wrapf(err, "failed to unmarshal json values %q", p)
}
}

return values, nil
}

func loadValues(resp io.ReadCloser) (map[string]map[string]interface{}, error) {
data, err := ioutil.ReadAll(resp)
if err != nil {
return nil, errors.Wrapf(err, "Failed to read response body")
}
values := make(map[string]map[string]interface{})
if err := json.Unmarshal(data, &values); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal json values in response body")
}

return values, nil
}
9 changes: 8 additions & 1 deletion services/sideload/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/kapacitor/services/httppost"
"github.com/influxdata/kapacitor/services/sideload"
)

Expand All @@ -22,7 +23,13 @@ func TestService_Source_Lookup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
src, err := s.Source(fmt.Sprintf("file://%s/testdata/src0", wd))
conf := httppost.Config{URLTemplate: fmt.Sprintf("file://%s/testdata/src0", wd)}
e := &httppost.Endpoint{}
if err := e.Update(conf); err != nil {
t.Fatal(err)
}

src, err := s.Source(e)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading