Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto-create 1.x DB or 2.x bucket for flux task logs #2622

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [#2605](https://github.com/influxdata/kapacitor/pull/2605): Updated jwt dependencies of libraries because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160
- [#2601](https://github.com/influxdata/kapacitor/pull/2601): Switched to github.com/golang-jwt/jwt for kapacitor's use because of https://nvd.nist.gov/vuln/detail/CVE-2020-26160
- [#2618](https://github.com/influxdata/kapacitor/pull/2618): Switch task service to use Flux formatter that preserves comments
- [#2622](https://github.com/influxdata/kapacitor/pull/2622): auto-create 1.x DB or 2.x bucket for flux task logs

### Features

Expand Down
3 changes: 1 addition & 2 deletions barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package kapacitor

import (
"errors"
"time"

"sync"
"sync/atomic"
"time"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
Expand Down
5 changes: 2 additions & 3 deletions http_post.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package kapacitor

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -9,9 +11,6 @@ import (
"sync"
"time"

"bytes"
"context"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
Expand Down
79 changes: 72 additions & 7 deletions influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Client interface {
// if it exists. Unlike QueryFlux, this returns a *Response
// object.
QueryFluxResponse(q FluxQuery) (*Response, error)

// CreateBucketV2 uses the 2.x /api/v2/bucket api to create a bucket.
// Note that 1.x does not support this API
CreateBucketV2(bucket string, org string, orgID string) error
}

type ClientUpdater interface {
Expand Down Expand Up @@ -147,6 +151,69 @@ type HTTPClient struct {
compression string
}

func (c *HTTPClient) getOrgID(org string) (string, error) {
u := c.url()
u.Path = "/api/v2/orgs"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return "", err
}
reader, err := c.doHttpV2(req, 200)
if err != nil {
return "", err
}
var val struct {
Orgs []struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"orgs"`
}
err = json.NewDecoder(reader).Decode(&val)
if err != nil {
return "", fmt.Errorf("decoding json from org request: %w", err)
}
// Check for "org" matching id to account for odd flux query API that will accept an org ID as an org name
for _, o := range val.Orgs {
if o.ID == org {
return o.ID, nil
}
}
for _, o := range val.Orgs {
if o.Name == org {
return o.ID, nil
}
}
return "", fmt.Errorf("unknown organization name %s", org)
}

func (c *HTTPClient) CreateBucketV2(bucket, org, orgID string) error {
u := c.url()
u.Path = "/api/v2/buckets"

var err error
if orgID == "" {
orgID, err = c.getOrgID(org)
if err != nil {
return fmt.Errorf("attempting to get org id for org: %w", err)
}
}

body := fmt.Sprintf(`{"orgID": %q, "name": %q, "retentionRules": [] }`, orgID, bucket)

req, err := http.NewRequest("POST", u.String(), bytes.NewBufferString(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")
reader, err := c.doHttpV2(req, 201)
if err != nil {
return err
}
_, err = io.Copy(io.Discard, reader)
return err
}

// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf Config) (*HTTPClient, error) {
Expand Down Expand Up @@ -367,7 +434,7 @@ func (r readClose) Close() error {

var _ io.ReadCloser = readClose{}

func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, error) {
func (c *HTTPClient) doHttpV2(req *http.Request, codes ...int) (io.ReadCloser, error) {
// Get current config
config := c.loadConfig()
// Set auth credentials
Expand Down Expand Up @@ -422,7 +489,7 @@ func (c *HTTPClient) doFlux(req *http.Request, codes ...int) (io.ReadCloser, err
Error string `json:"error"`
}{}
if err := d.Decode(&rp); err != nil {
return nil, err
return nil, fmt.Errorf("error attempting to decode http response, status %d: %w", resp.StatusCode, err)
}
if rp.Error != "" {
return nil, errors.New(rp.Error)
Expand Down Expand Up @@ -593,9 +660,7 @@ type Result struct {

func (c *HTTPClient) buildFluxRequest(q FluxQuery) (*http.Request, error) {
u := c.url()
if c.url().Path == "" || c.url().Path == "/" {
u.Path = "/api/v2/query"
}
u.Path = "/api/v2/query"

v := url.Values{}
if q.Org != "" {
Expand Down Expand Up @@ -645,7 +710,7 @@ func (c *HTTPClient) QueryFluxResponse(q FluxQuery) (*Response, error) {
if err != nil {
return nil, err
}
reader, err := c.doFlux(req, http.StatusOK)
reader, err := c.doHttpV2(req, http.StatusOK)
if err != nil {
return nil, err
}
Expand All @@ -663,7 +728,7 @@ func (c *HTTPClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) {
if err != nil {
return nil, err
}
reader, err := c.doFlux(req, http.StatusOK)
reader, err := c.doHttpV2(req, http.StatusOK)
if err != nil {
return nil, err
}
Expand Down
117 changes: 117 additions & 0 deletions influxdb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClient_Flux(t *testing.T) {
Expand Down Expand Up @@ -451,3 +453,118 @@ func TestBatchPoints_SettersGetters(t *testing.T) {
t.Errorf("Expected: %s, got %s", bp.WriteConsistency(), "wc2")
}
}

const cannedOrgResponse = `{
"links": {
"self": "/api/v2/orgs"
},
"orgs": [
{
"links": {
"buckets": "/api/v2/buckets?org=myorg",
"dashboards": "/api/v2/dashboards?org=myorg",
"labels": "/api/v2/orgs/e89732d2ac84d94a/labels",
"logs": "/api/v2/orgs/e89732d2ac84d94a/logs",
"members": "/api/v2/orgs/e89732d2ac84d94a/members",
"owners": "/api/v2/orgs/e89732d2ac84d94a/owners",
"secrets": "/api/v2/orgs/e89732d2ac84d94a/secrets",
"self": "/api/v2/orgs/e89732d2ac84d94a",
"tasks": "/api/v2/tasks?org=myorg"
},
"id": "e89732d2ac84d94a",
"name": "org1",
"description": "",
"createdAt": "2021-09-13T18:57:23.816229Z",
"updatedAt": "2021-09-13T18:57:23.816239Z"
},
{
"links": {
"buckets": "/api/v2/buckets?org=myorg",
"dashboards": "/api/v2/dashboards?org=myorg",
"labels": "/api/v2/orgs/0x0x0x0xac84d94a/labels",
"logs": "/api/v2/orgs/0x0x0x0xac84d94a/logs",
"members": "/api/v2/orgs/0x0x0x0xac84d94a/members",
"owners": "/api/v2/orgs/0x0x0x0xac84d94a/owners",
"secrets": "/api/v2/orgs/0x0x0x0xac84d94a/secrets",
"self": "/api/v2/orgs/0x0x0x0xac84d94a",
"tasks": "/api/v2/tasks?org=myorg"
},
"id": "0x0x0x0xac84d94a",
"name": "org2",
"description": "",
"createdAt": "2021-09-13T18:57:23.816229Z",
"updatedAt": "2021-09-13T18:57:23.816239Z"
}
]
}`

func TestHTTPClient_CreateBucketV2(t *testing.T) {

tests := []struct {
name string
org string
orgId string
bucket string
expectBucketBody string
wantErr string
}{
{
name: "basic",
orgId: "x0x0x0",
bucket: "mybucket",
expectBucketBody: `{"orgID": "x0x0x0", "name": "mybucket", "retentionRules": [] }`,
wantErr: "",
},
{
name: "with-unknown-org",
org: "some-other-org",
bucket: "mybucket",
expectBucketBody: "",
wantErr: "unknown organization name",
},
{
name: "with-org",
org: "org2",
bucket: "mybucket",
expectBucketBody: `{"orgID": "0x0x0x0xac84d94a", "name": "mybucket", "retentionRules": [] }`,
wantErr: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotBucketBody := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "Token mytoken", r.Header.Get("Authorization"))
body, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
switch r.URL.Path {
case "/api/v2/buckets":
assert.Equal(t, tt.expectBucketBody, string(body))
gotBucketBody = true
w.WriteHeader(201)
case "/api/v2/orgs":
w.WriteHeader(200)
w.Write([]byte(cannedOrgResponse))
default:
w.WriteHeader(404)
}
}))
c, err := NewHTTPClient(Config{
URLs: []string{ts.URL},
Credentials: Credentials{
Method: TokenAuthentication,
Token: "mytoken",
},
})
require.NoError(t, err)
err = c.CreateBucketV2(tt.bucket, tt.org, tt.orgId)
if tt.wantErr == "" {
assert.NoError(t, err)
} else {
assert.Contains(t, err.Error(), tt.wantErr)
}
assert.Equal(t, tt.expectBucketBody != "", gotBucketBody)
})
}
}
4 changes: 4 additions & 0 deletions influxdb/token_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (tc *tokenClient) QueryFluxResponse(q FluxQuery) (*Response, error) {
return tc.client.QueryFluxResponse(q)
}

func (tc *tokenClient) CreateBucketV2(bucket, org, orgID string) error {
return tc.client.CreateBucketV2(bucket, org, orgID)
}

func (tc *tokenClient) Open() error {
tc.mu.Lock()
defer tc.mu.Unlock()
Expand Down
1 change: 0 additions & 1 deletion services/auth/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/influxdata/kapacitor/services/storage"
"github.com/influxdata/kapacitor/tlsconfig"
"github.com/pkg/errors"

"golang.org/x/crypto/bcrypt"
)

Expand Down
5 changes: 3 additions & 2 deletions services/bigpanda/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package bigpanda

import (
"bytes"
"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
"testing"
"time"

"github.com/influxdata/kapacitor/alert"
"github.com/influxdata/kapacitor/models"
)

func TestService_SerializeEventData(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions services/discord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"io"
"io/ioutil"
"net/http"
"sync"
text "text/template"
"time"

"sync"

"github.com/influxdata/kapacitor/alert"
khttp "github.com/influxdata/kapacitor/http"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down
17 changes: 14 additions & 3 deletions services/fluxtask/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fluxtask

import (
"context"
"fmt"
"time"

"github.com/influxdata/influxdb/v2/kit/platform"
Expand Down Expand Up @@ -42,9 +43,16 @@ func (s *Service) Open() error {
if s.config.Enabled {
// create the task stack
s.kvService = kv.New(s.StorageService)
s.kvService.Open()
if err := s.kvService.Open(); err != nil {
return err
}
bucket := s.config.TaskRunBucket
if bucket == "" {
// Deal with previous default of empty string
bucket = task.DefaultTaskRunBucket
}
dataDestination := backend.DataDestination{
Bucket: s.config.TaskRunBucket,
Bucket: bucket,
Org: s.config.TaskRunOrg,
OrgID: s.config.TaskRunOrgID,
Measurement: s.config.TaskRunMeasurement,
Expand All @@ -60,13 +68,16 @@ func (s *Service) Open() error {
if err != nil {
return err
}
combinedTaskService := backend.NewAnalyticalStorage(
combinedTaskService, err := backend.NewAnalyticalStorage(
s.logger.With(zap.String("service", "fluxtask-analytical-store")),
taskService,
taskControlService,
cli,
dataDestination,
)
if err != nil {
return fmt.Errorf("creating analytical store: %w", err)
}
taskService = combinedTaskService
taskControlService = combinedTaskService
}
Expand Down
Loading