diff --git a/CHANGELOG.md b/CHANGELOG.md index ca673e101..3fb876bf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [#2605](https://github.com/influxdata/kapacitor/pull/2605): Updated jwt dependencies of libraries because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160 - [#2601](https://github.com/influxdata/kapacitor/pull/2601): Switched to github.com/golang-jwt/jwt for kapacitor's use because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160 - [#2618](https://github.com/influxdata/kapacitor/pull/2618): Switch task service to use Flux formatter that preserves comments +- [#2622](https://github.com/influxdata/kapacitor/pull/2622): auto-create 1.x DB or 2.x bucket for flux task logs ### Features diff --git a/barrier.go b/barrier.go index d1f122ad7..da3bd093d 100644 --- a/barrier.go +++ b/barrier.go @@ -2,10 +2,9 @@ package kapacitor import ( "errors" - "time" - "sync" "sync/atomic" + "time" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/models" diff --git a/http_post.go b/http_post.go index e774ef8b1..58136f19e 100644 --- a/http_post.go +++ b/http_post.go @@ -1,6 +1,8 @@ package kapacitor import ( + "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -9,9 +11,6 @@ import ( "sync" "time" - "bytes" - "context" - "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/models" diff --git a/influxdb/client.go b/influxdb/client.go index 108de1d34..992b72de2 100644 --- a/influxdb/client.go +++ b/influxdb/client.go @@ -52,6 +52,10 @@ type Client interface { // if it exists. Unlike QueryFlux, this returns a *Response // object. QueryFluxResponse(q FluxQuery) (*Response, error) + + // CreateBucketV2 uses the 2.x /api/v2/bucket api to create a bucket. + // Note that 1.x does not support this API + CreateBucketV2(bucket string, org string, orgID string) error } type ClientUpdater interface { @@ -147,6 +151,69 @@ type HTTPClient struct { compression string } +func (c *HTTPClient) getOrgID(org string) (string, error) { + u := c.url() + u.Path = "/api/v2/orgs" + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return "", err + } + reader, err := c.doHttpV2(req, 200) + if err != nil { + return "", err + } + var val struct { + Orgs []struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"orgs"` + } + err = json.NewDecoder(reader).Decode(&val) + if err != nil { + return "", fmt.Errorf("decoding json from org request: %w", err) + } + // Check for "org" matching id to account for odd flux query API that will accept an org ID as an org name + for _, o := range val.Orgs { + if o.ID == org { + return o.ID, nil + } + } + for _, o := range val.Orgs { + if o.Name == org { + return o.ID, nil + } + } + return "", fmt.Errorf("unknown organization name %s", org) +} + +func (c *HTTPClient) CreateBucketV2(bucket, org, orgID string) error { + u := c.url() + u.Path = "/api/v2/buckets" + + var err error + if orgID == "" { + orgID, err = c.getOrgID(org) + if err != nil { + return fmt.Errorf("attempting to get org id for org: %w", err) + } + } + + body := fmt.Sprintf(`{"orgID": %q, "name": %q, "retentionRules": [] }`, orgID, bucket) + + req, err := http.NewRequest("POST", u.String(), bytes.NewBufferString(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept-Encoding", "gzip") + reader, err := c.doHttpV2(req, 201) + if err != nil { + return err + } + _, err = io.Copy(io.Discard, reader) + return err +} + // NewHTTPClient returns a new Client from the provided config. // Client is safe for concurrent use by multiple goroutines. func NewHTTPClient(conf Config) (*HTTPClient, error) { @@ -367,7 +434,7 @@ func (r readClose) Close() error { var _ io.ReadCloser = readClose{} -func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, error) { +func (c *HTTPClient) doHttpV2(req *http.Request, codes ...int) (io.ReadCloser, error) { // Get current config config := c.loadConfig() // Set auth credentials @@ -422,7 +489,7 @@ func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, err Error string `json:"error"` }{} if err := d.Decode(&rp); err != nil { - return nil, err + return nil, fmt.Errorf("error attempting to decode http response, status %d: %w", resp.StatusCode, err) } if rp.Error != "" { return nil, errors.New(rp.Error) @@ -593,9 +660,7 @@ type Result struct { func (c *HTTPClient) buildFluxRequest(q FluxQuery) (*http.Request, error) { u := c.url() - if c.url().Path == "" || c.url().Path == "/" { - u.Path = "/api/v2/query" - } + u.Path = "/api/v2/query" v := url.Values{} if q.Org != "" { @@ -645,7 +710,7 @@ func (c *HTTPClient) QueryFluxResponse(q FluxQuery) (*Response, error) { if err != nil { return nil, err } - reader, err := c.doFlux(req, http.StatusOK) + reader, err := c.doHttpV2(req, http.StatusOK) if err != nil { return nil, err } @@ -663,7 +728,7 @@ func (c *HTTPClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) { if err != nil { return nil, err } - reader, err := c.doFlux(req, http.StatusOK) + reader, err := c.doHttpV2(req, http.StatusOK) if err != nil { return nil, err } diff --git a/influxdb/client_test.go b/influxdb/client_test.go index 114d55333..f77868440 100644 --- a/influxdb/client_test.go +++ b/influxdb/client_test.go @@ -13,6 +13,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestClient_Flux(t *testing.T) { @@ -451,3 +453,118 @@ func TestBatchPoints_SettersGetters(t *testing.T) { t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc2") } } + +const cannedOrgResponse = `{ + "links": { + "self": "/api/v2/orgs" + }, + "orgs": [ + { + "links": { + "buckets": "/api/v2/buckets?org=myorg", + "dashboards": "/api/v2/dashboards?org=myorg", + "labels": "/api/v2/orgs/e89732d2ac84d94a/labels", + "logs": "/api/v2/orgs/e89732d2ac84d94a/logs", + "members": "/api/v2/orgs/e89732d2ac84d94a/members", + "owners": "/api/v2/orgs/e89732d2ac84d94a/owners", + "secrets": "/api/v2/orgs/e89732d2ac84d94a/secrets", + "self": "/api/v2/orgs/e89732d2ac84d94a", + "tasks": "/api/v2/tasks?org=myorg" + }, + "id": "e89732d2ac84d94a", + "name": "org1", + "description": "", + "createdAt": "2021-09-13T18:57:23.816229Z", + "updatedAt": "2021-09-13T18:57:23.816239Z" + }, + { + "links": { + "buckets": "/api/v2/buckets?org=myorg", + "dashboards": "/api/v2/dashboards?org=myorg", + "labels": "/api/v2/orgs/0x0x0x0xac84d94a/labels", + "logs": "/api/v2/orgs/0x0x0x0xac84d94a/logs", + "members": "/api/v2/orgs/0x0x0x0xac84d94a/members", + "owners": "/api/v2/orgs/0x0x0x0xac84d94a/owners", + "secrets": "/api/v2/orgs/0x0x0x0xac84d94a/secrets", + "self": "/api/v2/orgs/0x0x0x0xac84d94a", + "tasks": "/api/v2/tasks?org=myorg" + }, + "id": "0x0x0x0xac84d94a", + "name": "org2", + "description": "", + "createdAt": "2021-09-13T18:57:23.816229Z", + "updatedAt": "2021-09-13T18:57:23.816239Z" + } + ] +}` + +func TestHTTPClient_CreateBucketV2(t *testing.T) { + + tests := []struct { + name string + org string + orgId string + bucket string + expectBucketBody string + wantErr string + }{ + { + name: "basic", + orgId: "x0x0x0", + bucket: "mybucket", + expectBucketBody: `{"orgID": "x0x0x0", "name": "mybucket", "retentionRules": [] }`, + wantErr: "", + }, + { + name: "with-unknown-org", + org: "some-other-org", + bucket: "mybucket", + expectBucketBody: "", + wantErr: "unknown organization name", + }, + { + name: "with-org", + org: "org2", + bucket: "mybucket", + expectBucketBody: `{"orgID": "0x0x0x0xac84d94a", "name": "mybucket", "retentionRules": [] }`, + wantErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotBucketBody := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "Token mytoken", r.Header.Get("Authorization")) + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + switch r.URL.Path { + case "/api/v2/buckets": + assert.Equal(t, tt.expectBucketBody, string(body)) + gotBucketBody = true + w.WriteHeader(201) + case "/api/v2/orgs": + w.WriteHeader(200) + w.Write([]byte(cannedOrgResponse)) + default: + w.WriteHeader(404) + } + })) + c, err := NewHTTPClient(Config{ + URLs: []string{ts.URL}, + Credentials: Credentials{ + Method: TokenAuthentication, + Token: "mytoken", + }, + }) + require.NoError(t, err) + err = c.CreateBucketV2(tt.bucket, tt.org, tt.orgId) + if tt.wantErr == "" { + assert.NoError(t, err) + } else { + assert.Contains(t, err.Error(), tt.wantErr) + } + assert.Equal(t, tt.expectBucketBody != "", gotBucketBody) + }) + } +} diff --git a/influxdb/token_client.go b/influxdb/token_client.go index d18aa8abf..6f25a8292 100644 --- a/influxdb/token_client.go +++ b/influxdb/token_client.go @@ -121,6 +121,10 @@ func (tc *tokenClient) QueryFluxResponse(q FluxQuery) (*Response, error) { return tc.client.QueryFluxResponse(q) } +func (tc *tokenClient) CreateBucketV2(bucket, org, orgID string) error { + return tc.client.CreateBucketV2(bucket, org, orgID) +} + func (tc *tokenClient) Open() error { tc.mu.Lock() defer tc.mu.Unlock() diff --git a/services/auth/service.go b/services/auth/service.go index 12eccf01a..0dc333b4c 100644 --- a/services/auth/service.go +++ b/services/auth/service.go @@ -22,7 +22,6 @@ import ( "github.com/influxdata/kapacitor/services/storage" "github.com/influxdata/kapacitor/tlsconfig" "github.com/pkg/errors" - "golang.org/x/crypto/bcrypt" ) diff --git a/services/bigpanda/service_test.go b/services/bigpanda/service_test.go index 526730c05..6f2b12d10 100644 --- a/services/bigpanda/service_test.go +++ b/services/bigpanda/service_test.go @@ -2,10 +2,11 @@ package bigpanda import ( "bytes" - "github.com/influxdata/kapacitor/alert" - "github.com/influxdata/kapacitor/models" "testing" "time" + + "github.com/influxdata/kapacitor/alert" + "github.com/influxdata/kapacitor/models" ) func TestService_SerializeEventData(t *testing.T) { diff --git a/services/discord/service.go b/services/discord/service.go index 83a0b4b05..239b6081f 100644 --- a/services/discord/service.go +++ b/services/discord/service.go @@ -7,11 +7,10 @@ import ( "io" "io/ioutil" "net/http" + "sync" text "text/template" "time" - "sync" - "github.com/influxdata/kapacitor/alert" khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/keyvalue" diff --git a/services/fluxtask/service.go b/services/fluxtask/service.go index fb93d2f76..8cbb3e8b7 100644 --- a/services/fluxtask/service.go +++ b/services/fluxtask/service.go @@ -2,6 +2,7 @@ package fluxtask import ( "context" + "fmt" "time" "github.com/influxdata/influxdb/v2/kit/platform" @@ -42,9 +43,16 @@ func (s *Service) Open() error { if s.config.Enabled { // create the task stack s.kvService = kv.New(s.StorageService) - s.kvService.Open() + if err := s.kvService.Open(); err != nil { + return err + } + bucket := s.config.TaskRunBucket + if bucket == "" { + // Deal with previous default of empty string + bucket = task.DefaultTaskRunBucket + } dataDestination := backend.DataDestination{ - Bucket: s.config.TaskRunBucket, + Bucket: bucket, Org: s.config.TaskRunOrg, OrgID: s.config.TaskRunOrgID, Measurement: s.config.TaskRunMeasurement, @@ -60,13 +68,16 @@ func (s *Service) Open() error { if err != nil { return err } - combinedTaskService := backend.NewAnalyticalStorage( + combinedTaskService, err := backend.NewAnalyticalStorage( s.logger.With(zap.String("service", "fluxtask-analytical-store")), taskService, taskControlService, cli, dataDestination, ) + if err != nil { + return fmt.Errorf("creating analytical store: %w", err) + } taskService = combinedTaskService taskControlService = combinedTaskService } diff --git a/services/httppost/service.go b/services/httppost/service.go index 0ae1b6600..eadce638a 100644 --- a/services/httppost/service.go +++ b/services/httppost/service.go @@ -2,6 +2,7 @@ package httppost import ( "bytes" + "context" "crypto/tls" "encoding/json" "fmt" @@ -12,11 +13,8 @@ import ( "strings" "sync" "text/template" - "time" - "context" - "github.com/influxdata/kapacitor/alert" khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/keyvalue" diff --git a/services/influxdb/service.go b/services/influxdb/service.go index 54746e959..dc45d95cb 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -563,6 +563,11 @@ func (c *influxdbCluster) Open() error { c.watchSubs() + // Even if we're not linking subscriptions, validate the client to ensure influxdb is actually up + if err := c.validateClientWithBackoff(ctx); err != nil { + return errors.Wrap(err, "failed to validate influxdb client on startup") + } + if err := c.linkSubscriptions(ctx, c.subName); err != nil { return errors.Wrap(err, "failed to link subscription on startup") } diff --git a/services/influxdb/service_test.go b/services/influxdb/service_test.go index 87b9bdc0a..5a76850ce 100644 --- a/services/influxdb/service_test.go +++ b/services/influxdb/service_test.go @@ -1303,6 +1303,10 @@ func (c influxDBClient) Update(config influxcli.Config) error { return nil } +func (c influxDBClient) CreateBucketV2(bucket string, org string, orgID string) error { + return nil +} + type logSerivce struct { } diff --git a/services/load/service.go b/services/load/service.go index 39518841c..5028b9130 100644 --- a/services/load/service.go +++ b/services/load/service.go @@ -12,7 +12,6 @@ import ( "sync" "github.com/ghodss/yaml" - "github.com/influxdata/kapacitor/client/v1" kexpvar "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/server/vars" diff --git a/services/smtp/service.go b/services/smtp/service.go index c3bc7fa92..8cb01592e 100644 --- a/services/smtp/service.go +++ b/services/smtp/service.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/keyvalue" - "gopkg.in/gomail.v2" ) diff --git a/services/teams/service.go b/services/teams/service.go index eac085341..a330a2eaf 100644 --- a/services/teams/service.go +++ b/services/teams/service.go @@ -6,12 +6,11 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" "net/url" "sync/atomic" - "math" - "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/keyvalue" "github.com/pkg/errors" diff --git a/services/telegram/service.go b/services/telegram/service.go index d0ef7cbda..a615a3481 100644 --- a/services/telegram/service.go +++ b/services/telegram/service.go @@ -9,9 +9,8 @@ import ( "net/http" "net/url" "path" - "sync/atomic" - "strings" + "sync/atomic" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/keyvalue" diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index a7f6f20b6..7a9826736 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" "github.com/influxdata/flux" @@ -29,6 +30,79 @@ const ( statusTag = "status" ) +func CreateDestination(log *zap.Logger, cli influxdb.Client, dest DataDestination) error { + const numTries = 5 + for try := 0; ; try++ { + // poll for influxdb to come up - usually it should be already up by the time we get here. + if try > 0 { + time.Sleep(1 * time.Second) + } + queryForError := func(q string) error { + // This query is only successful if the bucket exists + result, err := cli.QueryFlux(influxdb.FluxQuery{ + Org: dest.Org, + OrgID: dest.OrgID, + Query: q, + }) + if err != nil { + return err + } + for result.More() { + if err := result.Next().Tables().Do(func(table flux.Table) error { + return table.Do(func(col flux.ColReader) error { + return nil + }) + }); err != nil { + return err + } + } + return result.Err() + } + + // We do this instead of interpreting the response from `buckets()` because + // for 1.x `from(bucket: "mydb")` is valid and means to use the default retention + // policy, but `buckets()` will return the fully qualified retention policy. + err := queryForError(fmt.Sprintf(`from(bucket: %q) |> range(start: -1s) |> filter(fn: (r) => r._measurement == %q)`, dest.Bucket, dest.Measurement)) + if err == nil { + log.Info("Successfully read from bucket for task analytic store") + return nil + } + log.Info("Error reading from bucket for task analytic storage, attempting to create it", zap.Error(err), zap.String("name", dest.Bucket)) + + if try > numTries { + break + } + + // First check if we're reading any flux queries at all + err = queryForError("buckets()") + if err != nil { + log.Warn("InfluxDB instance for analytic store not processing flux queries", zap.Error(err)) + continue + } + + if strings.Contains(dest.Bucket, "/") { + log.Info("Bucket contains /, creating a 1.x database with non-default retention policy is not supported, skipping 1.x database creation", zap.String("name", dest.Bucket)) + } else { + _, err := cli.Query(influxdb.Query{ + Command: fmt.Sprintf("CREATE DATABASE %q", dest.Bucket), + }) + if err != nil { + log.Warn("Error attempting to create 1.x database", zap.Error(err)) + } else { + // we successfully tried to create a database + continue + } + } + + log.Info("Could not create using 1.x CREATE DATABASE api, try 2.x /api/v2/bucket instead") + err = cli.CreateBucketV2(dest.Bucket, dest.Org, dest.OrgID) + if err != nil { + log.Warn("Error attempting to create 2.x bucket", zap.Error(err)) + } + } + return fmt.Errorf("could not create database for analytic store, tried %d times", numTries) +} + // RunRecorder is a type which records runs into an influxdb // backed storage mechanism type RunRecorder interface { @@ -41,22 +115,10 @@ type PointsWriter interface { type NoOpPointsWriter struct{} -func (*NoOpPointsWriter) WritePoints(ctx context.Context, bucket string, points models.Points) error { +func (*NoOpPointsWriter) WritePoints(_ context.Context, _ string, _ models.Points) error { return nil } -// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware (deprecated) -func NewAnalyticalStorage(log *zap.Logger, ts taskmodel.TaskService, tcs TaskControlService, cli influxdb.Client, dest DataDestination) *AnalyticalStorage { - return &AnalyticalStorage{ - log: log, - TaskService: ts, - TaskControlService: tcs, - destination: dest, - rr: NewStoragePointsWriterRecorder(log, cli, dest), - cli: cli, - } -} - type DataDestination struct { Bucket string Org string @@ -74,6 +136,21 @@ type AnalyticalStorage struct { log *zap.Logger } +// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware (deprecated) +func NewAnalyticalStorage(log *zap.Logger, ts taskmodel.TaskService, tcs TaskControlService, cli influxdb.Client, dest DataDestination) (*AnalyticalStorage, error) { + if err := CreateDestination(log, cli, dest); err != nil { + return nil, err + } + return &AnalyticalStorage{ + log: log, + TaskService: ts, + TaskControlService: tcs, + destination: dest, + rr: NewStoragePointsWriterRecorder(log, cli, dest), + cli: cli, + }, nil +} + func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID platform.ID) (*taskmodel.Run, error) { run, err := as.TaskControlService.FinishRun(ctx, taskID, runID) if err != nil { @@ -83,7 +160,7 @@ func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID platfo } // FindLogs returns logs for a run. -// First attempt to use the TaskService, then append additional analytical's logs to the list +// First attempt to use the TaskService, then append additional analytical logs to the list func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter taskmodel.LogFilter) ([]*taskmodel.Log, int, error) { var logs []*taskmodel.Log if filter.Run != nil { @@ -113,7 +190,7 @@ func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter taskmodel.LogF } // FindRuns returns a list of runs that match a filter and the total count of returned runs. -// First attempt to use the TaskService, then append additional analytical's runs to the list +// First attempt to use the TaskService, then append additional analytical runs to the list func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter taskmodel.RunFilter) ([]*taskmodel.Run, int, error) { if filter.Limit == 0 { filter.Limit = taskmodel.TaskDefaultPageSize diff --git a/task/config.go b/task/config.go index 0eab67a45..1511640ff 100644 --- a/task/config.go +++ b/task/config.go @@ -38,9 +38,12 @@ type Config struct { Secrets map[string]string `toml:"secrets"` } +const DefaultTaskRunBucket = "kapacitor_fluxtask_logs" + func NewConfig() Config { return Config{ TaskRunMeasurement: "runs", + TaskRunBucket: DefaultTaskRunBucket, } } @@ -54,8 +57,5 @@ func (c Config) Validate() error { if len(c.TaskRunOrgID) > 0 && len(c.TaskRunOrg) > 0 { return fmt.Errorf("only one of task-run-org and task-run-orgid should be set") } - if len(c.TaskRunBucket) == 0 { - return fmt.Errorf("task-run-bucket is required") - } return nil } diff --git a/tick/stack.go b/tick/stack.go index 1ff8e0101..f9a62eedd 100644 --- a/tick/stack.go +++ b/tick/stack.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - //"log" ) var ErrEmptyStack = errors.New("stack is empty")