From 45a23ccfa38e76570c255dc4aa33de7306b45979 Mon Sep 17 00:00:00 2001 From: Jim Regovich Date: Thu, 19 Apr 2018 10:44:21 -0500 Subject: [PATCH 1/5] Add HTTP urls as an option for a sideload source. --- pipeline/sideload.go | 3 + pipeline/sideload_test.go | 18 +++-- pipeline/tick/sideload.go | 2 + pipeline/tick/sideload_test.go | 4 ++ services/sideload/service.go | 127 +++++++++++++++++++++++++++++---- sideload.go | 7 +- 6 files changed, 138 insertions(+), 23 deletions(-) diff --git a/pipeline/sideload.go b/pipeline/sideload.go index ecd029c4e..db52f7754 100644 --- a/pipeline/sideload.go +++ b/pipeline/sideload.go @@ -35,6 +35,9 @@ type SideloadNode struct { // Tags is a list of tags to load. // tick:ignore Tags map[string]string `tick:"Tag" json:"tags"` + + HttpUser string `json:"httpuser"` + HttpPassword string `json:"httppassword"` } func newSideloadNode(wants EdgeType) *SideloadNode { diff --git a/pipeline/sideload_test.go b/pipeline/sideload_test.go index c77074784..676e06616 100644 --- a/pipeline/sideload_test.go +++ b/pipeline/sideload_test.go @@ -6,10 +6,12 @@ import ( func TestSideloadNode_MarshalJSON(t *testing.T) { type fields struct { - Source string - Order []string - Fields map[string]interface{} - Tags map[string]string + Source string + Order []string + Fields map[string]interface{} + Tags map[string]string + HttpUser string + HttpPassword string } tests := []struct { name string @@ -30,6 +32,8 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { "t1": "k1", "t2": "", }, + HttpUser: "", + HttpPassword: "", }, want: `{ "typeOf": "sideload", @@ -47,7 +51,9 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { "tags": { "t1": "k1", "t2": "" - } + }, + "httpuser": "", + "httppassword": "" }`, }, } @@ -59,6 +65,8 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { w.OrderList = tt.fields.Order w.Fields = tt.fields.Fields w.Tags = tt.fields.Tags + w.HttpUser = tt.fields.HttpUser + w.HttpPassword = tt.fields.HttpPassword MarshalIndentTestHelper(t, w, tt.wantErr, tt.want) }) } diff --git a/pipeline/tick/sideload.go b/pipeline/tick/sideload.go index 1ad5d4a6e..24501ac71 100644 --- a/pipeline/tick/sideload.go +++ b/pipeline/tick/sideload.go @@ -26,6 +26,8 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) { n.Pipe("sideload") n.Dot("source", d.Source) + n.Dot("httpUser", d.HttpUser) + n.Dot("httpPassword", d.HttpPassword) order := make([]interface{}, len(d.OrderList)) for i := range d.OrderList { order[i] = d.OrderList[i] diff --git a/pipeline/tick/sideload_test.go b/pipeline/tick/sideload_test.go index 3f8114381..b2438770b 100644 --- a/pipeline/tick/sideload_test.go +++ b/pipeline/tick/sideload_test.go @@ -13,11 +13,15 @@ func TestSideload(t *testing.T) { def.Field("finance", "loan") def.Tag("vocabulary", "volcano") def.Tag("make", "toyota") + def.HttpUser = ("user") + def.HttpPassword = ("password") want := `stream |from() |sideload() .source('file:///tmpdir') + .httpUser('user') + .httpPassword('password') .order('a', 'b', 'c') .field('finance', 'loan') .field('judgement', 'plantiff') diff --git a/services/sideload/service.go b/services/sideload/service.go index acbc900cc..69f00f22b 100644 --- a/services/sideload/service.go +++ b/services/sideload/service.go @@ -3,12 +3,14 @@ package sideload import ( "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" "os" "path/filepath" "sync" + "time" "github.com/ghodss/yaml" "github.com/influxdata/kapacitor/keyvalue" @@ -86,37 +88,81 @@ func (s *Service) Reload() error { } func (s *Service) Source(srcURL string) (Source, error) { + var src Source + u, err := url.Parse(srcURL) if err != nil { return nil, err } - if u.Scheme != "file" { - return nil, fmt.Errorf("unsupported source scheme %q, must be 'file'", u.Scheme) + if u.Scheme != "file" && u.Scheme != "http" { + return nil, fmt.Errorf("unsupported source scheme %q, must be 'file' or 'http'", u.Scheme) } - if !filepath.IsAbs(u.Path) { - return nil, fmt.Errorf("sideload source path must be absolute %q", u.Path) + + if u.Scheme == "file" { + src, err = s.SourceFile(u.Path) + } else if u.Scheme == "http" { + src, err = s.SourceHttp(srcURL) } - dir := filepath.Clean(u.Path) + + return src, err +} + +func (s *Service) SourceHttp(srcURL string) (Source, error) { + var err error + dir := srcURL s.mu.Lock() defer s.mu.Unlock() + /* + if err != nil { + return nil,fmt.Errorf("Error creating request for sideload data from %s :: %s",srcURL,err.Error()) + } + */ + src, ok := s.sources[dir] + if !ok { + src = &source{ + s: s, + dir: dir, + scheme: "http", + } + err = src.updateCache() + if err != nil { + return nil, err + } + s.sources[dir] = src + } + src.referenceCount++ + if err != nil { + return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", srcURL, err.Error()) + } + return src, nil +} +func (s *Service) SourceFile(path string) (Source, error) { + if !filepath.IsAbs(path) { + return nil, fmt.Errorf("sideload source path must be absolute %q", path) + } + dir := filepath.Clean(path) + s.mu.Lock() + defer s.mu.Unlock() src, ok := s.sources[dir] if !ok { src = &source{ - s: s, - dir: dir, + s: s, + dir: dir, + scheme: "file", } err := src.updateCache() if err != nil { return nil, err } + s.sources[dir] = src } src.referenceCount++ return src, nil -} +} func (s *Service) removeSource(src *source) { s.mu.Lock() defer s.mu.Unlock() @@ -132,9 +178,13 @@ type Source interface { } type source struct { - s *Service - dir string - mu sync.RWMutex + s *Service + scheme string + dir string + mu sync.RWMutex + httpUser string + httpPassword string + cache map[string]map[string]interface{} referenceCount int } @@ -143,10 +193,7 @@ func (s *source) Close() { s.s.removeSource(s) } -func (s *source) updateCache() error { - s.mu.Lock() - defer s.mu.Unlock() - s.cache = make(map[string]map[string]interface{}) +func (s *source) updateCacheFile() error { err := filepath.Walk(s.dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -173,6 +220,42 @@ func (s *source) updateCache() error { }) return errors.Wrapf(err, "failed to update sideload cache for source %q", s.dir) } +func (s *source) updateCacheHttp() error { + req, err := http.NewRequest("GET", s.dir, nil) + if s.httpUser != "" { + req.SetBasicAuth(s.httpUser, s.httpPassword) + } + client := &http.Client{ + Timeout: time.Second * 10, + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + values, err := loadValues(resp.Body) + if err != nil { + return errors.Wrapf(err, "failed to update sideload cache for source %q", s.dir) + } + for k, v := range values { + s.cache[k] = v + } + return nil +} + +func (s *source) updateCache() error { + s.mu.Lock() + defer s.mu.Unlock() + s.cache = make(map[string]map[string]interface{}) + + if s.scheme == "file" { + return s.updateCacheFile() + } else if s.scheme == "http" { + return s.updateCacheHttp() + } + return nil +} func (s *source) Lookup(order []string, key string) (value interface{}) { key = filepath.Clean(key) @@ -218,5 +301,19 @@ func readValues(p string) (map[string]interface{}, error) { return nil, errors.Wrapf(err, "failed to unmarshal json values %q", p) } } + + return values, nil +} + +func loadValues(resp io.ReadCloser) (map[string]map[string]interface{}, error) { + data, err := ioutil.ReadAll(resp) + if err != nil { + return nil, errors.Wrapf(err, "Failed to read response body") + } + values := make(map[string]map[string]interface{}) + if err := json.Unmarshal(data, &values); err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal json values in response body") + } + return values, nil } diff --git a/sideload.go b/sideload.go index 1e935d92f..d8d426577 100644 --- a/sideload.go +++ b/sideload.go @@ -21,9 +21,10 @@ type SideloadNode struct { source sideload.Source orderTmpls []orderTmpl - order []string - - bufferPool *bufpool.Pool + order []string + httpUser string + httpPassword string + bufferPool *bufpool.Pool } // Create a new SideloadNode which loads fields and tags from external sources. From c3f55ad5a6f7a7711494fe5955713b19957a1b14 Mon Sep 17 00:00:00 2001 From: Jim Regovich Date: Thu, 26 Apr 2018 11:16:49 -0500 Subject: [PATCH 2/5] Add PR info to CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 938f23c1d..3b878ecc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## v1.5.0 [unreleased] ### Features - +- [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration. - [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration. - [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value for each time a series field changes. From 614254def1d405d5d2d8cad7c34350032dba0013 Mon Sep 17 00:00:00 2001 From: Jim Regovich Date: Thu, 31 May 2018 11:28:12 -0500 Subject: [PATCH 3/5] Add support for endpoints and verify multi-key behavior Support HTTPS Update tests for new format. --- pipeline/sideload.go | 27 +++++++++++++++++++--- pipeline/sideload_test.go | 18 ++++----------- pipeline/tick/sideload.go | 3 +-- pipeline/tick/sideload_test.go | 4 ---- services/httppost/service.go | 20 ++++++++-------- services/sideload/service.go | 38 ++++++++++++++++--------------- services/sideload/service_test.go | 6 ++++- sideload.go | 38 +++++++++++++++++++++++-------- task_master.go | 2 +- 9 files changed, 94 insertions(+), 62 deletions(-) diff --git a/pipeline/sideload.go b/pipeline/sideload.go index db52f7754..a86625dcf 100644 --- a/pipeline/sideload.go +++ b/pipeline/sideload.go @@ -16,7 +16,31 @@ import ( // // Add a field `cpu_threshold` and a tag `foo` to each point based on the value loaded from the hierarchical source. // The list of templates in the `.order()` property are evaluated using the points tags. +// .source may be one of: a file URI, a URL, or an endpoint name as configured in the kapacitor configuration file +// as an httppost object. +// +// If source is defiend as a plain string and not a URI (file://,http://), the source wil intepreseted as an endpoint +// defined in an [[httpost]] section in the kapacitor configuration. +// +// A source defined as an HTTP URL or [[httppost]] endpoint will be loaded as an HTTP GET and loaded only once when a +// task is enabled, and then on subsequent calls to the /sideload/reload endpoint. +// An HTTP source ednpoint should return a JSON document where each property is a key name specified in the order statement +// and a its value is an object with a set of key/value pairs. +// HTTP Source example: +//{ +// "host1" : { +// "cpu_threhsold":98, +// "some_tag": "value", +// "disable": "False" +// }, +// "some_tag_value": { +// "cpu_threshold": 97 +// "another_tag": "value" +// } +// } +// // The files paths are checked then checked in order for the specified keys and the first value that is found is used. +// HTTP endpoints are checked in the same manner. type SideloadNode struct { chainnode @@ -35,9 +59,6 @@ type SideloadNode struct { // Tags is a list of tags to load. // tick:ignore Tags map[string]string `tick:"Tag" json:"tags"` - - HttpUser string `json:"httpuser"` - HttpPassword string `json:"httppassword"` } func newSideloadNode(wants EdgeType) *SideloadNode { diff --git a/pipeline/sideload_test.go b/pipeline/sideload_test.go index 676e06616..c77074784 100644 --- a/pipeline/sideload_test.go +++ b/pipeline/sideload_test.go @@ -6,12 +6,10 @@ import ( func TestSideloadNode_MarshalJSON(t *testing.T) { type fields struct { - Source string - Order []string - Fields map[string]interface{} - Tags map[string]string - HttpUser string - HttpPassword string + Source string + Order []string + Fields map[string]interface{} + Tags map[string]string } tests := []struct { name string @@ -32,8 +30,6 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { "t1": "k1", "t2": "", }, - HttpUser: "", - HttpPassword: "", }, want: `{ "typeOf": "sideload", @@ -51,9 +47,7 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { "tags": { "t1": "k1", "t2": "" - }, - "httpuser": "", - "httppassword": "" + } }`, }, } @@ -65,8 +59,6 @@ func TestSideloadNode_MarshalJSON(t *testing.T) { w.OrderList = tt.fields.Order w.Fields = tt.fields.Fields w.Tags = tt.fields.Tags - w.HttpUser = tt.fields.HttpUser - w.HttpPassword = tt.fields.HttpPassword MarshalIndentTestHelper(t, w, tt.wantErr, tt.want) }) } diff --git a/pipeline/tick/sideload.go b/pipeline/tick/sideload.go index 24501ac71..f40e2e492 100644 --- a/pipeline/tick/sideload.go +++ b/pipeline/tick/sideload.go @@ -26,8 +26,7 @@ func (n *SideloadNode) Build(d *pipeline.SideloadNode) (ast.Node, error) { n.Pipe("sideload") n.Dot("source", d.Source) - n.Dot("httpUser", d.HttpUser) - n.Dot("httpPassword", d.HttpPassword) + order := make([]interface{}, len(d.OrderList)) for i := range d.OrderList { order[i] = d.OrderList[i] diff --git a/pipeline/tick/sideload_test.go b/pipeline/tick/sideload_test.go index b2438770b..3f8114381 100644 --- a/pipeline/tick/sideload_test.go +++ b/pipeline/tick/sideload_test.go @@ -13,15 +13,11 @@ func TestSideload(t *testing.T) { def.Field("finance", "loan") def.Tag("vocabulary", "volcano") def.Tag("make", "toyota") - def.HttpUser = ("user") - def.HttpPassword = ("password") want := `stream |from() |sideload() .source('file:///tmpdir') - .httpUser('user') - .httpPassword('password') .order('a', 'b', 'c') .field('finance', 'loan') .field('judgement', 'plantiff') diff --git a/services/httppost/service.go b/services/httppost/service.go index 87edf3184..58dde1bad 100644 --- a/services/httppost/service.go +++ b/services/httppost/service.go @@ -27,9 +27,9 @@ type Diagnostic interface { // Only one of name and url should be non-empty type Endpoint struct { mu sync.RWMutex - url string + Url string headers map[string]string - auth BasicAuth + Auth BasicAuth alertTemplate *template.Template rowTemplate *template.Template closed bool @@ -37,9 +37,9 @@ type Endpoint struct { func NewEndpoint(url string, headers map[string]string, auth BasicAuth, at, rt *template.Template) *Endpoint { return &Endpoint{ - url: url, + Url: url, headers: headers, - auth: auth, + Auth: auth, alertTemplate: at, rowTemplate: rt, } @@ -54,9 +54,9 @@ func (e *Endpoint) Close() { func (e *Endpoint) Update(c Config) error { e.mu.Lock() defer e.mu.Unlock() - e.url = c.URL + e.Url = c.URL e.headers = c.Headers - e.auth = c.BasicAuth + e.Auth = c.BasicAuth at, err := c.getAlertTemplate() if err != nil { return err @@ -89,13 +89,13 @@ func (e *Endpoint) NewHTTPRequest(body io.Reader) (req *http.Request, err error) return nil, errors.New("endpoint was closed") } - req, err = http.NewRequest("POST", e.url, body) + req, err = http.NewRequest("POST", e.Url, body) if err != nil { return nil, fmt.Errorf("failed to create POST request: %v", err) } - if e.auth.valid() { - req.SetBasicAuth(e.auth.Username, e.auth.Password) + if e.Auth.valid() { + req.SetBasicAuth(e.Auth.Username, e.Auth.Password) } for k, v := range e.headers { @@ -217,7 +217,7 @@ func (s *Service) Test(options interface{}) error { // Create the HTTP request var req *http.Request e := &Endpoint{ - url: o.URL, + Url: o.URL, headers: o.Headers, } req, err = e.NewHTTPRequest(body) diff --git a/services/sideload/service.go b/services/sideload/service.go index 69f00f22b..eab64e419 100644 --- a/services/sideload/service.go +++ b/services/sideload/service.go @@ -15,6 +15,7 @@ import ( "github.com/ghodss/yaml" "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/services/httpd" + "github.com/influxdata/kapacitor/services/httppost" "github.com/pkg/errors" ) @@ -87,29 +88,29 @@ func (s *Service) Reload() error { return nil } -func (s *Service) Source(srcURL string) (Source, error) { +func (s *Service) Source(endpoint *httppost.Endpoint) (Source, error) { var src Source - u, err := url.Parse(srcURL) + u, err := url.Parse(endpoint.Url) if err != nil { return nil, err } - if u.Scheme != "file" && u.Scheme != "http" { + if u.Scheme != "file" && u.Scheme != "http" && u.Scheme != "https" { return nil, fmt.Errorf("unsupported source scheme %q, must be 'file' or 'http'", u.Scheme) } if u.Scheme == "file" { src, err = s.SourceFile(u.Path) - } else if u.Scheme == "http" { - src, err = s.SourceHttp(srcURL) + } else if u.Scheme == "http" || u.Scheme == "https" { + src, err = s.SourceHttp(endpoint, u.Scheme) } return src, err } -func (s *Service) SourceHttp(srcURL string) (Source, error) { +func (s *Service) SourceHttp(endpoint *httppost.Endpoint, scheme string) (Source, error) { var err error - dir := srcURL + dir := endpoint.Url s.mu.Lock() defer s.mu.Unlock() /* @@ -122,7 +123,8 @@ func (s *Service) SourceHttp(srcURL string) (Source, error) { src = &source{ s: s, dir: dir, - scheme: "http", + scheme: scheme, + e: endpoint, } err = src.updateCache() if err != nil { @@ -133,7 +135,7 @@ func (s *Service) SourceHttp(srcURL string) (Source, error) { src.referenceCount++ if err != nil { - return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", srcURL, err.Error()) + return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error()) } return src, nil } @@ -178,12 +180,11 @@ type Source interface { } type source struct { - s *Service - scheme string - dir string - mu sync.RWMutex - httpUser string - httpPassword string + s *Service + scheme string + dir string + mu sync.RWMutex + e *httppost.Endpoint cache map[string]map[string]interface{} referenceCount int @@ -222,9 +223,10 @@ func (s *source) updateCacheFile() error { } func (s *source) updateCacheHttp() error { req, err := http.NewRequest("GET", s.dir, nil) - if s.httpUser != "" { - req.SetBasicAuth(s.httpUser, s.httpPassword) + if s.e.Auth.Username != "" && s.e.Auth.Password != "" { + req.SetBasicAuth(s.e.Auth.Username, s.e.Auth.Password) } + client := &http.Client{ Timeout: time.Second * 10, } @@ -251,7 +253,7 @@ func (s *source) updateCache() error { if s.scheme == "file" { return s.updateCacheFile() - } else if s.scheme == "http" { + } else if s.scheme == "http" || s.scheme == "https" { return s.updateCacheHttp() } return nil diff --git a/services/sideload/service_test.go b/services/sideload/service_test.go index ce2918996..5f1cc3264 100644 --- a/services/sideload/service_test.go +++ b/services/sideload/service_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/sideload" ) @@ -22,7 +23,10 @@ func TestService_Source_Lookup(t *testing.T) { if err != nil { t.Fatal(err) } - src, err := s.Source(fmt.Sprintf("file://%s/testdata/src0", wd)) + e := &httppost.Endpoint{ + Url: fmt.Sprintf("file://%s/testdata/src0", wd), + } + src, err := s.Source(e) if err != nil { t.Fatal(err) } diff --git a/sideload.go b/sideload.go index d8d426577..ecb1f56f9 100644 --- a/sideload.go +++ b/sideload.go @@ -2,17 +2,19 @@ package kapacitor import ( "fmt" - "strconv" - "text/template" - text "text/template" - "github.com/influxdata/kapacitor/bufpool" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/sideload" "github.com/pkg/errors" + "log" + "net/url" + "strconv" + "text/template" + text "text/template" ) type SideloadNode struct { @@ -20,15 +22,15 @@ type SideloadNode struct { s *pipeline.SideloadNode source sideload.Source orderTmpls []orderTmpl - - order []string - httpUser string - httpPassword string - bufferPool *bufpool.Pool + Endpoint *httppost.Endpoint + order []string + bufferPool *bufpool.Pool } // Create a new SideloadNode which loads fields and tags from external sources. func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnostic) (*SideloadNode, error) { + var e *httppost.Endpoint + var ok bool sn := &SideloadNode{ node: node{Node: n, et: et, diag: d}, s: n, @@ -36,11 +38,27 @@ func newSideloadNode(et *ExecutingTask, n *pipeline.SideloadNode, d NodeDiagnost order: make([]string, len(n.OrderList)), orderTmpls: make([]orderTmpl, len(n.OrderList)), } - src, err := et.tm.SideloadService.Source(n.Source) + u, err := url.Parse(n.Source) + if err != nil { + log.Fatal(err) + } + if u.Scheme == "" { + e, ok = et.tm.HTTPPostService.Endpoint(n.Source) + if !ok { + log.Fatal("Specified endpoint does not exist: " + n.Source) + } + } else { + e = &httppost.Endpoint{ + Url: n.Source, + } + } + sn.Endpoint = e + src, err := et.tm.SideloadService.Source(e) if err != nil { return nil, err } sn.source = src + for i, o := range n.OrderList { op, err := newOrderTmpl(o, sn.bufferPool) if err != nil { diff --git a/task_master.go b/task_master.go index f269aaaee..20a35b0c9 100644 --- a/task_master.go +++ b/task_master.go @@ -194,7 +194,7 @@ type TaskMaster struct { } SideloadService interface { - Source(dir string) (sideload.Source, error) + Source(*httppost.Endpoint) (sideload.Source, error) } Commander command.Commander From 05bc3d82196e974dcf1a75ff20814e5041fbb185 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 16 Dec 2020 16:31:37 -0600 Subject: [PATCH 4/5] chore: update CHANGELOG.md --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1398151fb..48d9aa765 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ ### Features - [#1839](https://github.com/influxdata/kapacitor/pull/1839): Add Subscription path configuration option to allow Kapacitor to run behind a reverse proxy, thanks @aspring +- [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration, thanks @jregovic! - [#2055](https://github.com/influxdata/kapacitor/pull/2055): Add support for correlate in the Alerta AlertNode, thanks @nermolaev! - [#2409](https://github.com/influxdata/kapacitor/pull/2409): Optionally use kapacitor alert details as opsgenie description text, thanks @JamesClonk! - [#2441](https://github.com/influxdata/kapacitor/pull/2441): Preallocate GroupIDs for increased performance by reducing allocations. @@ -103,8 +104,6 @@ ## v1.5.0 [2018-05-17] ### Features -- [#1894](https://github.com/influxdata/kapacitor/pull/1894): Add HTTP sources for sideload configuration. - - [#1842](https://github.com/influxdata/kapacitor/pull/1842): Add alert inhibitors that allow an alert to suppress events from other matching alerts. - [#1833](https://github.com/influxdata/kapacitor/pull/1833): Config format updated to allow for more than one slack configuration. - [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value From 1ca24a5c9bc63bf12f66b34fe31e26b4d547e27e Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 16 Dec 2020 18:50:00 -0600 Subject: [PATCH 5/5] chore: use types for behavior in sideload/service.go --- pipeline/sideload.go | 10 +-- services/sideload/service.go | 118 ++++++++++++++++++----------------- 2 files changed, 65 insertions(+), 63 deletions(-) diff --git a/pipeline/sideload.go b/pipeline/sideload.go index a86625dcf..5d981447f 100644 --- a/pipeline/sideload.go +++ b/pipeline/sideload.go @@ -16,20 +16,20 @@ import ( // // Add a field `cpu_threshold` and a tag `foo` to each point based on the value loaded from the hierarchical source. // The list of templates in the `.order()` property are evaluated using the points tags. -// .source may be one of: a file URI, a URL, or an endpoint name as configured in the kapacitor configuration file +// .source may be one of: a file URI, a URL, or an endpoint name as configured in the Kapacitor configuration file // as an httppost object. // -// If source is defiend as a plain string and not a URI (file://,http://), the source wil intepreseted as an endpoint -// defined in an [[httpost]] section in the kapacitor configuration. +// If source is defined as a plain string and not a URI (file://,http://), the source wil interpreted as an endpoint +// defined in an [[httpost]] section in the Kapacitor configuration. // // A source defined as an HTTP URL or [[httppost]] endpoint will be loaded as an HTTP GET and loaded only once when a // task is enabled, and then on subsequent calls to the /sideload/reload endpoint. -// An HTTP source ednpoint should return a JSON document where each property is a key name specified in the order statement +// An HTTP source endpoint should return a JSON document where each property is a key name specified in the order statement // and a its value is an object with a set of key/value pairs. // HTTP Source example: //{ // "host1" : { -// "cpu_threhsold":98, +// "cpu_threshold":98, // "some_tag": "value", // "disable": "False" // }, diff --git a/services/sideload/service.go b/services/sideload/service.go index 50e39476d..a1b1a5661 100644 --- a/services/sideload/service.go +++ b/services/sideload/service.go @@ -36,7 +36,7 @@ type Service struct { routes []httpd.Route mu sync.Mutex - sources map[string]*source + sources map[string]Source HTTPDService interface { AddRoutes([]httpd.Route) error @@ -47,7 +47,7 @@ type Service struct { func NewService(d Diagnostic) *Service { return &Service{ diag: d, - sources: make(map[string]*source), + sources: make(map[string]Source), } } @@ -82,7 +82,7 @@ func (s *Service) Reload() error { s.mu.Lock() defer s.mu.Unlock() for dir, src := range s.sources { - if err := src.updateCache(); err != nil { + if err := src.UpdateCache(); err != nil { return errors.Wrapf(err, "failed to update source %q", dir) } } @@ -124,19 +124,21 @@ func (s *Service) sourceHttp(endpoint *httppost.Endpoint, scheme string) (Source defer s.mu.Unlock() src, ok := s.sources[dir] if !ok { - src = &source{ - s: s, - dir: dir, - scheme: scheme, - e: endpoint, + src = &httpSource{ + fileSource: fileSource{ + s: s, + dir: dir, + scheme: scheme, + }, + e: endpoint, } - err = src.updateCache() + err = src.UpdateCache() if err != nil { return nil, fmt.Errorf("Error fetching sideload data from %s :: %s", dir, err.Error()) } s.sources[dir] = src } - src.referenceCount++ + src.addToReferenceCount(1) return src, nil } @@ -149,24 +151,24 @@ func (s *Service) sourceFile(path string) (Source, error) { defer s.mu.Unlock() src, ok := s.sources[dir] if !ok { - src = &source{ + src = &fileSource{ s: s, dir: dir, scheme: "file", } - err := src.updateCache() + err := src.UpdateCache() if err != nil { return nil, err } s.sources[dir] = src } - src.referenceCount++ + src.addToReferenceCount(1) return src, nil } -func (s *Service) removeSource(src *source) { +func (s *Service) removeSource(src *fileSource) { s.mu.Lock() defer s.mu.Unlock() src.referenceCount-- @@ -178,24 +180,30 @@ func (s *Service) removeSource(src *source) { type Source interface { Lookup(order []string, key string) interface{} Close() + UpdateCache() error + addToReferenceCount(int) int } -type source struct { - s *Service - scheme string - dir string - mu sync.RWMutex - e *httppost.Endpoint - +type fileSource struct { + s *Service + dir string + mu sync.RWMutex + scheme string cache map[string]map[string]interface{} referenceCount int } -func (s *source) Close() { +func (s *fileSource) addToReferenceCount(i int) int { + s.referenceCount += i + return s.referenceCount +} + +func (s *fileSource) Close() { s.s.removeSource(s) } -func (s *source) updateCacheFile() error { +func (s *fileSource) UpdateCache() error { + s.cache = make(map[string]map[string]interface{}) err := filepath.Walk(s.dir, func(path string, info os.FileInfo, err error) error { if err != nil { return err @@ -222,7 +230,35 @@ func (s *source) updateCacheFile() error { }) return errors.Wrapf(err, "failed to update sideload cache for source file %q", s.dir) } -func (s *source) updateCacheHttp() error { + +func (s *fileSource) Lookup(order []string, key string) (value interface{}) { + key = filepath.Clean(key) + + s.mu.RLock() + defer s.mu.RUnlock() + + for _, o := range order { + values, ok := s.cache[o] + if !ok { + continue + } + v, ok := values[key] + if !ok { + continue + } + value = v + break + } + return +} + +type httpSource struct { + fileSource + e *httppost.Endpoint +} + +func (s *httpSource) UpdateCache() error { + s.cache = make(map[string]map[string]interface{}) req, err := http.NewRequest("GET", s.dir, nil) if err != nil { return errors.Wrapf(err, "failed to generate request to update sideload cache for source %q", s.dir) @@ -250,40 +286,6 @@ func (s *source) updateCacheHttp() error { return nil } -func (s *source) updateCache() error { - s.mu.Lock() - defer s.mu.Unlock() - s.cache = make(map[string]map[string]interface{}) - - if s.scheme == "file" { - return s.updateCacheFile() - } else if s.scheme == "http" || s.scheme == "https" { - return s.updateCacheHttp() - } - return nil -} - -func (s *source) Lookup(order []string, key string) (value interface{}) { - key = filepath.Clean(key) - - s.mu.RLock() - defer s.mu.RUnlock() - - for _, o := range order { - values, ok := s.cache[o] - if !ok { - continue - } - v, ok := values[key] - if !ok { - continue - } - value = v - break - } - return -} - func readValues(p string) (map[string]interface{}, error) { f, err := os.Open(p) if err != nil {