diff --git a/CHANGELOG.md b/CHANGELOG.md index f29f1d131d6..9541c1bee8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ 1. [16175](https://github.com/influxdata/influxdb/pull/16175): Added delete functionality to note cells so that they can be deleted 1. [16204](https://github.com/influxdata/influxdb/pull/16204): Fix failure to create labels when creating telegraf configs 1. [16207](https://github.com/influxdata/influxdb/pull/16207): Fix crash when editing a Telegraf config +1. [16201](https://github.com/influxdata/influxdb/pull/16201): Updated start/endtime functionality so that custom script timeranges overwrite dropdown selections +1. [16217](https://github.com/influxdata/influxdb/pull/16217): Fix 12-hour time format to use consistent formatting and number of time ticks ### UI Improvements diff --git a/cmd/influx/bucket.go b/cmd/influx/bucket.go index 96b27db7a49..0e69c699f61 100644 --- a/cmd/influx/bucket.go +++ b/cmd/influx/bucket.go @@ -81,25 +81,14 @@ func bucketCreateF(cmd *cobra.Command, args []string) error { RetentionPeriod: bucketCreateFlags.retention, } - if bucketCreateFlags.orgID != "" { - id, err := platform.IDFromString(bucketCreateFlags.orgID) - if err != nil { - return fmt.Errorf("failed to decode org id %q: %v", bucketCreateFlags.orgID, err) - } - b.OrgID = *id - } else if bucketCreateFlags.org != "" { - orgSvc, err := newOrganizationService() - if err != nil { - return fmt.Errorf("failed to initialize organization service client: %v", err) - } - - filter := platform.OrganizationFilter{Name: &bucketCreateFlags.org} - org, err := orgSvc.FindOrganization(context.Background(), filter) - if err != nil { - return err - } + orgSvc, err := newOrganizationService() + if err != nil { + return nil + } - b.OrgID = org.ID + b.OrgID, err = getOrgID(orgSvc, bucketCreateFlags.orgID, bucketCreateFlags.org) + if err != nil { + return err } if err := s.CreateBucket(context.Background(), b); err != nil { diff --git a/cmd/influx/inspect.go b/cmd/influx/inspect.go index a04d1105f22..c73423a8355 100644 --- a/cmd/influx/inspect.go +++ b/cmd/influx/inspect.go @@ -1,7 +1,6 @@ package main import ( - "context" "errors" "fmt" "os" @@ -95,25 +94,15 @@ func inspectReportTSMF(cmd *cobra.Command, args []string) error { return errors.New("org-id must be set for non-empty bucket-id") } - if inspectReportTSMFlags.orgID != "" { - var err error - report.OrgID, err = influxdb.IDFromString(inspectReportTSMFlags.orgID) - if err != nil { - return fmt.Errorf("invalid org ID provided: %s", err.Error()) - } - } else if inspectReportTSMFlags.org != "" { - orgSvc, err := newOrganizationService() - if err != nil { - return fmt.Errorf("failed to initialize organization service client: %v", err) - } - - filter := influxdb.OrganizationFilter{Name: &inspectReportTSMFlags.org} - org, err := orgSvc.FindOrganization(context.Background(), filter) - if err != nil { - return fmt.Errorf("%v", err) - } - report.OrgID = &org.ID + orgSvc, err := newOrganizationService() + if err != nil { + return nil + } + id, err := getOrgID(orgSvc, bucketCreateFlags.orgID, bucketCreateFlags.org) + if err != nil { + return nil } + report.OrgID = &id if inspectReportTSMFlags.bucketID != "" { bucketID, err := influxdb.IDFromString(inspectReportTSMFlags.bucketID) @@ -123,7 +112,7 @@ func inspectReportTSMF(cmd *cobra.Command, args []string) error { report.BucketID = bucketID } - _, err := report.Run(true) + _, err = report.Run(true) if err != nil { panic(err) } diff --git a/cmd/influx/main.go b/cmd/influx/main.go index fc0ca8ef20c..c29aeea15b0 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -242,3 +242,23 @@ func newLocalKVService() (*kv.Service, error) { return kv.NewService(zap.NewNop(), store), nil } + +func getOrgID(orgSVC influxdb.OrganizationService, id string, name string) (influxdb.ID, error) { + if id != "" { + influxOrgID, err := influxdb.IDFromString(id) + if err != nil { + return 0, fmt.Errorf("invalid org ID provided: %s", err.Error()) + } + return *influxOrgID, nil + } else if name != "" { + org, err := orgSVC.FindOrganization(context.Background(), influxdb.OrganizationFilter{ + Name: &name, + }) + if err != nil { + return 0, fmt.Errorf("%v", err) + } + return org.ID, nil + } + + return 0, fmt.Errorf("") +} diff --git a/cmd/influx/pkg.go b/cmd/influx/pkg.go index 129457aebae..4deef08854c 100644 --- a/cmd/influx/pkg.go +++ b/cmd/influx/pkg.go @@ -27,16 +27,16 @@ import ( "gopkg.in/yaml.v3" ) -type pkgSVCFn func(cliReq httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, error) +type pkgSVCsFn func(cliReq httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error) -func cmdPkg(svcFn pkgSVCFn, opts ...genericCLIOptfn) *cobra.Command { +func cmdPkg(svcFn pkgSVCsFn, opts ...genericCLIOptfn) *cobra.Command { return newCmdPkgBuilder(svcFn, opts...).cmdPkg() } type cmdPkgBuilder struct { genericCLIOpts - svcFn pkgSVCFn + svcFn pkgSVCsFn applyReqLimit int file string @@ -60,7 +60,7 @@ type cmdPkgBuilder struct { } } -func newCmdPkgBuilder(svcFn pkgSVCFn, opts ...genericCLIOptfn) *cmdPkgBuilder { +func newCmdPkgBuilder(svcFn pkgSVCsFn, opts ...genericCLIOptfn) *cmdPkgBuilder { opt := genericCLIOpts{ in: os.Stdin, w: os.Stdout, @@ -107,39 +107,30 @@ func (b *cmdPkgBuilder) cmdPkgApply() *cobra.Command { return cmd } +func (b *cmdPkgBuilder) validOrgFlags() error { + if b.orgID == "" && b.org == "" { + return fmt.Errorf("must specify org-id, or org name") + } else if b.orgID != "" && b.org != "" { + return fmt.Errorf("must specify org-id, or org name not both") + } + return nil +} + func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error { return func(cmd *cobra.Command, args []string) (e error) { - if b.orgID == "" && b.org == "" { - return fmt.Errorf("must specify org-id, or org name") - } else if b.orgID != "" && b.org != "" { - return fmt.Errorf("must specify org-id, or org name not both") + if err := b.validOrgFlags(); err != nil { + return err } color.NoColor = !b.hasColor - var influxOrgID *influxdb.ID - - if b.orgID != "" { - var err error - influxOrgID, err = influxdb.IDFromString(b.orgID) - if err != nil { - return fmt.Errorf("invalid org ID provided: %s", err.Error()) - } - } else if b.org != "" { - orgSvc, err := newOrganizationService() - if err != nil { - return fmt.Errorf("failed to initialize organization service client: %v", err) - } - filter := influxdb.OrganizationFilter{Name: &b.org} - org, err := orgSvc.FindOrganization(context.Background(), filter) - if err != nil { - return fmt.Errorf("%v", err) - } - influxOrgID = &org.ID + svc, orgSVC, err := b.svcFn(flags.httpClientOpts(), pkger.WithApplyReqLimit(b.applyReqLimit)) + if err != nil { + return err } - svc, err := b.svcFn(flags.httpClientOpts(), pkger.WithApplyReqLimit(b.applyReqLimit)) + influxOrgID, err := getOrgID(orgSVC, b.orgID, b.org) if err != nil { - return err + return nil } pkg, isTTY, err := b.readPkgStdInOrFile(b.file) @@ -147,7 +138,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error { return err } - _, diff, err := svc.DryRun(context.Background(), *influxOrgID, pkg) + _, diff, err := svc.DryRun(context.Background(), influxOrgID, pkg) if err != nil { return err } @@ -174,7 +165,7 @@ func (b *cmdPkgBuilder) pkgApplyRunEFn() func(*cobra.Command, []string) error { return errors.New("package has conflicts with existing resources and cannot safely apply") } - summary, err := svc.Apply(context.Background(), *influxOrgID, pkg) + summary, err := svc.Apply(context.Background(), influxOrgID, pkg) if err != nil { return err } @@ -221,7 +212,7 @@ func (b *cmdPkgBuilder) pkgNewRunEFn() func(*cobra.Command, []string) error { } } - pkgSVC, err := b.svcFn(flags.httpClientOpts()) + pkgSVC, _, err := b.svcFn(flags.httpClientOpts()) if err != nil { return err } @@ -253,7 +244,7 @@ func (b *cmdPkgBuilder) cmdPkgExport() *cobra.Command { func (b *cmdPkgBuilder) pkgExportRunEFn() func(*cobra.Command, []string) error { return func(cmd *cobra.Command, args []string) error { - pkgSVC, err := b.svcFn(flags.httpClientOpts()) + pkgSVC, _, err := b.svcFn(flags.httpClientOpts()) if err != nil { return err } @@ -308,7 +299,8 @@ func (b *cmdPkgBuilder) cmdPkgExportAll() *cobra.Command { cmd.Short = "Export all existing resources for an organization as a package" cmd.Flags().StringVarP(&b.file, "file", "f", "", "output file for created pkg; defaults to std out if no file provided; the extension of provided file (.yml/.json) will dictate encoding") - cmd.Flags().StringVarP(&b.orgID, "org-id", "o", "", "organization id") + cmd.Flags().StringVarP(&b.orgID, "org-id", "", "", "organization id") + cmd.Flags().StringVarP(&b.org, "org", "o", "", "The name of the organization that owns the bucket") cmd.Flags().StringVarP(&b.meta.Name, "name", "n", "", "name for new pkg") cmd.Flags().StringVarP(&b.meta.Description, "description", "d", "", "description for new pkg") cmd.Flags().StringVarP(&b.meta.Version, "version", "v", "", "version for new pkg") @@ -320,18 +312,22 @@ func (b *cmdPkgBuilder) cmdPkgExportAll() *cobra.Command { func (b *cmdPkgBuilder) pkgExportAllRunEFn() func(*cobra.Command, []string) error { return func(cmd *cobra.Command, args []string) error { - pkgSVC, err := b.svcFn(flags.httpClientOpts()) + if err := b.validOrgFlags(); err != nil { + return err + } + + pkgSVC, orgSVC, err := b.svcFn(flags.httpClientOpts()) if err != nil { return err } opts := []pkger.CreatePkgSetFn{pkger.CreateWithMetadata(b.meta)} - orgID, err := influxdb.IDFromString(b.orgID) + orgID, err := getOrgID(orgSVC, b.orgID, b.org) if err != nil { return err } - opts = append(opts, pkger.CreateWithAllOrgResources(*orgID)) + opts = append(opts, pkger.CreateWithAllOrgResources(orgID)) return b.writePkg(cmd.OutOrStdout(), pkgSVC, b.file, opts...) } @@ -507,10 +503,14 @@ func createPkgBuf(pkg *pkger.Pkg, outPath string) (*bytes.Buffer, error) { return &buf, nil } -func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, error) { +func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error) { httpClient, err := newHTTPClient() if err != nil { - return nil, err + return nil, nil, err + } + + orgSvc := &ihttp.OrganizationService{ + Client: httpClient, } return pkger.NewService( @@ -518,10 +518,11 @@ func newPkgerSVC(cliReqOpts httpClientOpts, opts ...pkger.ServiceSetterFn) (pkge pkger.WithBucketSVC(&ihttp.BucketService{Client: httpClient}), pkger.WithDashboardSVC(&ihttp.DashboardService{Client: httpClient}), pkger.WithLabelSVC(&ihttp.LabelService{Client: httpClient}), + pkger.WithNoticationEndpointSVC(ihttp.NewNotificationEndpointService(httpClient)), pkger.WithTelegrafSVC(ihttp.NewTelegrafService(httpClient)), pkger.WithVariableSVC(&ihttp.VariableService{Client: httpClient}), )..., - ), nil + ), orgSvc, nil } func pkgFromReader(stdin io.Reader) (*pkger.Pkg, error) { @@ -654,6 +655,18 @@ func (b *cmdPkgBuilder) printPkgDiff(diff pkger.Diff) { }) } + if endpoints := diff.NotificationEndpoints; len(endpoints) > 0 { + headers := []string{"New", "ID", "Name"} + tablePrintFn("NOTIFICATION ENDPOINTS", headers, len(endpoints), func(i int) []string { + v := endpoints[i] + return []string{ + boolDiff(v.IsNew()), + v.ID.String(), + v.Name, + } + }) + } + if teles := diff.Telegrafs; len(diff.Telegrafs) > 0 { headers := []string{"New", "Name", "Description"} tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string { @@ -733,6 +746,19 @@ func (b *cmdPkgBuilder) printPkgSummary(sum pkger.Summary) { }) } + if endpoints := sum.NotificationEndpoints; len(endpoints) > 0 { + headers := []string{"ID", "Name", "Description", "Status"} + tablePrintFn("NOTIFICATION ENDPOINTS", headers, len(endpoints), func(i int) []string { + v := endpoints[i] + return []string{ + v.GetID().String(), + v.GetName(), + v.GetDescription(), + string(v.GetStatus()), + } + }) + } + if teles := sum.TelegrafConfigs; len(teles) > 0 { headers := []string{"ID", "Name", "Description"} tablePrintFn("TELEGRAF CONFIGS", headers, len(teles), func(i int) []string { diff --git a/cmd/influx/pkg_test.go b/cmd/influx/pkg_test.go index ffabbfc2b6e..11fa7825c37 100644 --- a/cmd/influx/pkg_test.go +++ b/cmd/influx/pkg_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/errors" + "github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/pkger" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" @@ -20,9 +21,13 @@ import ( ) func Test_Pkg(t *testing.T) { - fakeSVCFn := func(svc pkger.SVC) pkgSVCFn { - return func(opts httpClientOpts, _ ...pkger.ServiceSetterFn) (pkger.SVC, error) { - return svc, nil + fakeSVCFn := func(svc pkger.SVC) pkgSVCsFn { + return func(opts httpClientOpts, _ ...pkger.ServiceSetterFn) (pkger.SVC, influxdb.OrganizationService, error) { + return svc, &mock.OrganizationService{ + FindOrganizationF: func(ctx context.Context, filter influxdb.OrganizationFilter) (*influxdb.Organization, error) { + return &influxdb.Organization{ID: influxdb.ID(9000), Name: "influxdata"}, nil + }, + }, nil } } @@ -126,6 +131,24 @@ func Test_Pkg(t *testing.T) { Version: "new version", }, }, + { + pkgFileArgs: pkgFileArgs{ + name: "yaml out", + encoding: pkger.EncodingYAML, + filename: "pkg_0.yml", + flags: []flagArg{ + {name: "name", val: "new name"}, + {name: "description", val: "new desc"}, + {name: "version", val: "new version"}, + {name: "org", val: "influxdata"}, + }, + }, + expectedMeta: pkger.Metadata{ + Name: "new name", + Description: "new desc", + Version: "new version", + }, + }, } cmdFn := func() *cobra.Command { diff --git a/cmd/influx/user.go b/cmd/influx/user.go index ed46b0f964a..c67643552d1 100644 --- a/cmd/influx/user.go +++ b/cmd/influx/user.go @@ -3,7 +3,6 @@ package main import ( "context" "errors" - "fmt" "os" platform "github.com/influxdata/influxdb" @@ -170,36 +169,27 @@ func userCreateF(cmd *cobra.Command, args []string) error { return nil } - var orgIDStr string - - if userCreateFlags.orgID != "" { - orgIDStr = userCreateFlags.orgID - } else if userCreateFlags.org != "" { - orgSvc, err := newOrganizationService() - if err != nil { - return fmt.Errorf("failed to initialize organization service client: %v", err) - } - - filter := platform.OrganizationFilter{Name: &bucketCreateFlags.org} - org, err := orgSvc.FindOrganization(context.Background(), filter) - if err != nil { - return fmt.Errorf("%v", err) - } + orgSVC, err := newOrganizationService() + if err != nil { + return err + } - orgIDStr = org.ID.GoString() + orgID, err := getOrgID(orgSVC, userCreateFlags.orgID, userCreateFlags.org) + if err != nil { + return err } + pass := userCreateFlags.password - if orgIDStr == "" && pass == "" { + if orgID == 0 && pass == "" { return writeOutput([]string{"ID", "Name"}, user.ID.String(), user.Name) } - if pass != "" && orgIDStr == "" { + if pass != "" && orgID == 0 { return errors.New("an org id is required when providing a user password") } - orgID, err := platform.IDFromString(orgIDStr) if err != nil { - return errors.New("an invalid org ID provided: " + orgIDStr) + return errors.New("an invalid org ID provided: " + orgID.GoString()) } c, err := newHTTPClient() @@ -215,7 +205,7 @@ func userCreateF(cmd *cobra.Command, args []string) error { UserID: user.ID, UserType: platform.Member, ResourceType: platform.OrgsResourceType, - ResourceID: *orgID, + ResourceID: orgID, }) if err != nil { return err diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 37ae49f70d6..d8aab08e326 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -647,11 +647,11 @@ func (m *Launcher) run(ctx context.Context) (err error) { sch, sm, err := scheduler.NewScheduler( executor, taskbackend.NewSchedulableTaskService(m.kvService), - scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { + scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledFor time.Time, err error) { schLogger.Info( "error in scheduler run", zap.String("taskID", platform.ID(taskID).String()), - zap.Time("scheduledAt", scheduledAt), + zap.Time("scheduledFor", scheduledFor), zap.Error(err)) }), ) @@ -840,6 +840,7 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkger.WithBucketSVC(authorizer.NewBucketService(b.BucketService)), pkger.WithDashboardSVC(authorizer.NewDashboardService(b.DashboardService)), pkger.WithLabelSVC(authorizer.NewLabelService(b.LabelService)), + pkger.WithNoticationEndpointSVC(authorizer.NewNotificationEndpointService(b.NotificationEndpointService, b.UserResourceMappingService, b.OrganizationService)), pkger.WithTelegrafSVC(authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService)), pkger.WithVariableSVC(authorizer.NewVariableService(b.VariableService)), ) diff --git a/cmd/influxd/launcher/launcher_helpers.go b/cmd/influxd/launcher/launcher_helpers.go index 7f5b81bb754..6a37c4ad3d7 100644 --- a/cmd/influxd/launcher/launcher_helpers.go +++ b/cmd/influxd/launcher/launcher_helpers.go @@ -344,6 +344,11 @@ func (tl *TestLauncher) LabelService(tb testing.TB) *http.LabelService { return &http.LabelService{Client: tl.HTTPClient(tb)} } +func (tl *TestLauncher) NotificationEndpointService(tb testing.TB) *http.NotificationEndpointService { + tb.Helper() + return http.NewNotificationEndpointService(tl.HTTPClient(tb)) +} + func (tl *TestLauncher) TelegrafService(tb testing.TB) *http.TelegrafService { tb.Helper() return http.NewTelegrafService(tl.HTTPClient(tb)) diff --git a/cmd/influxd/launcher/pkger_test.go b/cmd/influxd/launcher/pkger_test.go index ce8ccfbecdc..c75d045c2bd 100644 --- a/cmd/influxd/launcher/pkger_test.go +++ b/cmd/influxd/launcher/pkger_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync" "testing" "time" @@ -25,6 +24,7 @@ func TestLauncher_Pkger(t *testing.T) { pkger.WithBucketSVC(l.BucketService(t)), pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -52,6 +52,7 @@ func TestLauncher_Pkger(t *testing.T) { LabelService: l.LabelService(t), killCount: 2, // hits error on 3rd attempt at creating a mapping }), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -79,6 +80,18 @@ func TestLauncher_Pkger(t *testing.T) { require.NoError(t, err) assert.Empty(t, dashs) + endpoints, _, err := l.NotificationEndpointService(t).FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ + OrgID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, endpoints) + + teles, _, err := l.TelegrafService(t).FindTelegrafConfigs(ctx, influxdb.TelegrafConfigFilter{ + OrgID: &l.Org.ID, + }) + require.NoError(t, err) + assert.Empty(t, teles) + vars, err := l.VariableService(t).FindVariables(ctx, influxdb.VariableFilter{OrganizationID: &l.Org.ID}) require.NoError(t, err) assert.Empty(t, vars) @@ -132,6 +145,8 @@ func TestLauncher_Pkger(t *testing.T) { assert.True(t, diffVars[0].IsNew()) require.Len(t, diff.Dashboards, 1) + require.Len(t, diff.NotificationEndpoints, 1) + require.Len(t, diff.Telegrafs, 1) labels := sum.Labels require.Len(t, labels, 1) @@ -148,6 +163,18 @@ func TestLauncher_Pkger(t *testing.T) { assert.Equal(t, "desc1", dashs[0].Description) hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1") + endpoints := sum.NotificationEndpoints + require.Len(t, endpoints, 1) + assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName()) + assert.Equal(t, "http none auth desc", endpoints[0].GetDescription()) + hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") + + teles := sum.TelegrafConfigs + require.Len(t, teles, 1) + assert.Equal(t, "first_tele_config", teles[0].Name) + assert.Equal(t, "desc", teles[0].Description) + hasLabelAssociations(t, teles[0].LabelAssociations, 1, "label_1") + vars := sum.Variables require.Len(t, vars, 1) assert.Equal(t, "var_query_1", vars[0].Name) @@ -168,25 +195,41 @@ func TestLauncher_Pkger(t *testing.T) { labels := sum1.Labels require.Len(t, labels, 1) - assert.NotEqual(t, influxdb.ID(0), labels[0].ID) + assert.NotZero(t, labels[0].ID) assert.Equal(t, "label_1", labels[0].Name) bkts := sum1.Buckets require.Len(t, bkts, 1) - assert.NotEqual(t, influxdb.ID(0), bkts[0].ID) + assert.NotZero(t, bkts[0].ID) assert.Equal(t, "rucket_1", bkts[0].Name) hasLabelAssociations(t, bkts[0].LabelAssociations, 1, "label_1") dashs := sum1.Dashboards require.Len(t, dashs, 1) - assert.NotEqual(t, influxdb.ID(0), dashs[0].ID) + assert.NotZero(t, dashs[0].ID) assert.Equal(t, "dash_1", dashs[0].Name) assert.Equal(t, "desc1", dashs[0].Description) hasLabelAssociations(t, dashs[0].LabelAssociations, 1, "label_1") + endpoints := sum1.NotificationEndpoints + require.Len(t, endpoints, 1) + assert.NotZero(t, endpoints[0].GetID()) + assert.Equal(t, "http_none_auth_notification_endpoint", endpoints[0].GetName()) + assert.Equal(t, "http none auth desc", endpoints[0].GetDescription()) + assert.Equal(t, influxdb.TaskStatusInactive, string(endpoints[0].GetStatus())) + hasLabelAssociations(t, endpoints[0].LabelAssociations, 1, "label_1") + + teles := sum1.TelegrafConfigs + require.Len(t, teles, 1) + assert.NotZero(t, teles[0].ID) + assert.Equal(t, l.Org.ID, teles[0].OrgID) + assert.Equal(t, "first_tele_config", teles[0].Name) + assert.Equal(t, "desc", teles[0].Description) + assert.Equal(t, telConf, teles[0].Config) + vars := sum1.Variables require.Len(t, vars, 1) - assert.NotEqual(t, influxdb.ID(0), vars[0].ID) + assert.NotZero(t, vars[0].ID) assert.Equal(t, "var_query_1", vars[0].Name) hasLabelAssociations(t, vars[0].LabelAssociations, 1, "label_1") varArgs := vars[0].Arguments @@ -197,14 +240,6 @@ func TestLauncher_Pkger(t *testing.T) { Language: "flux", }, varArgs.Values) - teles := sum1.TelegrafConfigs - require.Len(t, teles, 1) - assert.NotZero(t, teles[0].ID) - assert.Equal(t, l.Org.ID, teles[0].OrgID) - assert.Equal(t, "first_tele_config", teles[0].Name) - assert.Equal(t, "desc", teles[0].Description) - assert.Equal(t, telConf, teles[0].Config) - newSumMapping := func(id influxdb.ID, name string, rt influxdb.ResourceType) pkger.SummaryLabelMapping { return pkger.SummaryLabelMapping{ ResourceName: name, @@ -218,7 +253,7 @@ func TestLauncher_Pkger(t *testing.T) { } mappings := sum1.LabelMappings - require.Len(t, mappings, 4) + require.Len(t, mappings, 5) hasMapping(t, mappings, newSumMapping(bkts[0].ID, bkts[0].Name, influxdb.BucketsResourceType)) hasMapping(t, mappings, newSumMapping(influxdb.ID(dashs[0].ID), dashs[0].Name, influxdb.DashboardsResourceType)) hasMapping(t, mappings, newSumMapping(vars[0].ID, vars[0].Name, influxdb.VariablesResourceType)) @@ -232,6 +267,7 @@ func TestLauncher_Pkger(t *testing.T) { require.Equal(t, sum1.Buckets, sum2.Buckets) require.Equal(t, sum1.Labels, sum2.Labels) + require.Equal(t, sum1.NotificationEndpoints, sum2.NotificationEndpoints) require.Equal(t, sum1.Variables, sum2.Variables) // dashboards should be new @@ -329,6 +365,7 @@ func TestLauncher_Pkger(t *testing.T) { }), pkger.WithDashboardSVC(l.DashboardService(t)), pkger.WithLabelSVC(l.LabelService(t)), + pkger.WithNoticationEndpointSVC(l.NotificationEndpointService(t)), pkger.WithTelegrafSVC(l.TelegrafService(t)), pkger.WithVariableSVC(l.VariableService(t)), ) @@ -433,6 +470,16 @@ spec: - kind: Label name: label_1 config: %+q + - kind: NotificationEndpointHTTP + name: http_none_auth_notification_endpoint + type: none + description: http none auth desc + method: GET + url: https://www.example.com/endpoint/noneauth + status: inactive + associations: + - kind: Label + name: label_1 `, telConf) const updatePkgYMLStr = `apiVersion: 0.1.0 @@ -480,18 +527,14 @@ func (f *fakeBucketSVC) UpdateBucket(ctx context.Context, id influxdb.ID, upd in type fakeLabelSVC struct { influxdb.LabelService - countMu sync.Mutex - callCount int + callCount mock.SafeCount killCount int } func (f *fakeLabelSVC) CreateLabelMapping(ctx context.Context, m *influxdb.LabelMapping) error { - f.countMu.Lock() - if f.callCount == f.killCount { - f.countMu.Unlock() + defer f.callCount.IncrFn()() + if f.callCount.Count() == f.killCount { return errors.New("reached kill count") } - f.callCount++ - f.countMu.Unlock() return f.LabelService.CreateLabelMapping(ctx, m) } diff --git a/http/api_handler.go b/http/api_handler.go index e4b93dafd23..b2704607ed9 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -184,8 +184,8 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { telegrafBackend := NewTelegrafBackend(b.Logger.With(zap.String("handler", "telegraf")), b) telegrafBackend.TelegrafService = authorizer.NewTelegrafConfigService(b.TelegrafService, b.UserResourceMappingService) + h.Mount(prefixTelegrafPlugins, NewTelegrafHandler(b.Logger, telegrafBackend)) h.Mount(prefixTelegraf, NewTelegrafHandler(b.Logger, telegrafBackend)) - h.Mount(prefixTelegrafs, NewTelegrafHandler(b.Logger, telegrafBackend)) userBackend := NewUserBackend(b.Logger.With(zap.String("handler", "user")), b) userBackend.UserService = authorizer.NewUserService(b.UserService) diff --git a/http/bucket_service.go b/http/bucket_service.go index 3557cb4893d..d4ad38abc93 100644 --- a/http/bucket_service.go +++ b/http/bucket_service.go @@ -852,7 +852,7 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) er var br bucketResponse err := s.Client. - Post(httpc.BodyJSON(newBucket(b)), prefixBuckets). + PostJSON(newBucket(b), prefixBuckets). DecodeJSON(&br). Do(ctx) if err != nil { @@ -872,7 +872,7 @@ func (s *BucketService) CreateBucket(ctx context.Context, b *influxdb.Bucket) er func (s *BucketService) UpdateBucket(ctx context.Context, id influxdb.ID, upd influxdb.BucketUpdate) (*influxdb.Bucket, error) { var br bucketResponse err := s.Client. - Patch(httpc.BodyJSON(newBucketUpdate(&upd)), bucketIDPath(id)). + PatchJSON(newBucketUpdate(&upd), bucketIDPath(id)). DecodeJSON(&br). Do(ctx) if err != nil { diff --git a/http/dashboard_service.go b/http/dashboard_service.go index 4cdb59cc5c1..d56fc91225f 100644 --- a/http/dashboard_service.go +++ b/http/dashboard_service.go @@ -1129,7 +1129,7 @@ func (s *DashboardService) FindDashboards(ctx context.Context, filter platform.D // CreateDashboard creates a new dashboard and sets b.ID with the new identifier. func (s *DashboardService) CreateDashboard(ctx context.Context, d *platform.Dashboard) error { return s.Client. - Post(httpc.BodyJSON(d), prefixDashboards). + PostJSON(d, prefixDashboards). DecodeJSON(d). Do(ctx) } @@ -1139,7 +1139,7 @@ func (s *DashboardService) CreateDashboard(ctx context.Context, d *platform.Dash func (s *DashboardService) UpdateDashboard(ctx context.Context, id platform.ID, upd platform.DashboardUpdate) (*platform.Dashboard, error) { var d platform.Dashboard err := s.Client. - Patch(httpc.BodyJSON(upd), prefixDashboards, id.String()). + PatchJSON(upd, prefixDashboards, id.String()). DecodeJSON(&d). Do(ctx) if err != nil { @@ -1164,7 +1164,7 @@ func (s *DashboardService) DeleteDashboard(ctx context.Context, id platform.ID) // AddDashboardCell adds a cell to a dashboard. func (s *DashboardService) AddDashboardCell(ctx context.Context, id platform.ID, c *platform.Cell, opts platform.AddDashboardCellOptions) error { return s.Client. - Post(httpc.BodyJSON(c), cellPath(id)). + PostJSON(c, cellPath(id)). DecodeJSON(c). Do(ctx) } @@ -1186,7 +1186,7 @@ func (s *DashboardService) UpdateDashboardCell(ctx context.Context, dashboardID, var c platform.Cell err := s.Client. - Patch(httpc.BodyJSON(upd), dashboardCellIDPath(dashboardID, cellID)). + PatchJSON(upd, dashboardCellIDPath(dashboardID, cellID)). DecodeJSON(&c). Do(ctx) if err != nil { @@ -1214,7 +1214,7 @@ func (s *DashboardService) GetDashboardCellView(ctx context.Context, dashboardID func (s *DashboardService) UpdateDashboardCellView(ctx context.Context, dashboardID, cellID platform.ID, upd platform.ViewUpdate) (*platform.View, error) { var dcv dashboardCellViewResponse err := s.Client. - Patch(httpc.BodyJSON(upd), cellViewPath(dashboardID, cellID)). + PatchJSON(upd, cellViewPath(dashboardID, cellID)). DecodeJSON(&dcv). Do(ctx) if err != nil { @@ -1226,7 +1226,7 @@ func (s *DashboardService) UpdateDashboardCellView(ctx context.Context, dashboar // ReplaceDashboardCells replaces all cells in a dashboard func (s *DashboardService) ReplaceDashboardCells(ctx context.Context, id platform.ID, cs []*platform.Cell) error { return s.Client. - Put(httpc.BodyJSON(cs), cellPath(id)). + PutJSON(cs, cellPath(id)). // TODO: previous implementation did not do anything with the response except validate it is valid json. // seems likely we should have to overwrite (:sadpanda:) the incoming cs... DecodeJSON(&dashboardCellsResponse{}). diff --git a/http/label_service.go b/http/label_service.go index 4008bf8d774..2d3fb381a77 100644 --- a/http/label_service.go +++ b/http/label_service.go @@ -582,7 +582,7 @@ func (s *LabelService) FindResourceLabels(ctx context.Context, filter influxdb.L func (s *LabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error { var lr labelResponse err := s.Client. - Post(httpc.BodyJSON(l), prefixLabels). + PostJSON(l, prefixLabels). DecodeJSON(&lr). Do(ctx) if err != nil { @@ -598,7 +598,7 @@ func (s *LabelService) CreateLabel(ctx context.Context, l *influxdb.Label) error func (s *LabelService) UpdateLabel(ctx context.Context, id influxdb.ID, upd influxdb.LabelUpdate) (*influxdb.Label, error) { var lr labelResponse err := s.Client. - Patch(httpc.BodyJSON(upd), labelIDPath(id)). + PatchJSON(upd, labelIDPath(id)). DecodeJSON(&lr). Do(ctx) if err != nil { @@ -622,7 +622,7 @@ func (s *LabelService) CreateLabelMapping(ctx context.Context, m *influxdb.Label urlPath := resourceIDPath(m.ResourceType, m.ResourceID, "labels") return s.Client. - Post(httpc.BodyJSON(m), urlPath). + PostJSON(m, urlPath). DecodeJSON(m). Do(ctx) } diff --git a/http/notification_endpoint.go b/http/notification_endpoint.go index 77fbf3b84a7..026185de071 100644 --- a/http/notification_endpoint.go +++ b/http/notification_endpoint.go @@ -609,7 +609,7 @@ var _ influxdb.NotificationEndpointService = (*NotificationEndpointService)(nil) // FindNotificationEndpointByID returns a single notification endpoint by ID. func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) { - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. Get(prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). @@ -617,7 +617,7 @@ func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.C if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // FindNotificationEndpoints returns a list of notification endpoints that match filter and the total count of matching notification endpoints. @@ -634,7 +634,9 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont params = append(params, [2]string{"org", *filter.Org}) } - var resp notificationEndpointsResponse + var resp struct { + Endpoints []notificationEndpointDecoder `json:"notificationEndpoints"` + } err := s.Client. Get(prefixNotificationEndpoints). QueryParams(params...). @@ -645,11 +647,10 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont } var endpoints []influxdb.NotificationEndpoint - for _, e := range resp.NotificationEndpoints { - endpoints = append(endpoints, e.NotificationEndpoint) + for _, e := range resp.Endpoints { + endpoints = append(endpoints, e.endpoint) } - - return endpoints, len(resp.NotificationEndpoints), nil + return endpoints, len(endpoints), nil } // CreateNotificationEndpoint creates a new notification endpoint and sets b.ID with the new identifier. @@ -658,32 +659,33 @@ func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Cont func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, ne influxdb.NotificationEndpoint, userID influxdb.ID) error { // userID is ignored here since server reads it off // the token/auth. its a nothing burger here - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. - Post(httpc.BodyJSON(ne), prefixNotificationEndpoints). + PostJSON(¬ificationEndpointEncoder{ne: ne}, prefixNotificationEndpoints). DecodeJSON(&resp). Do(ctx) if err != nil { return err } // :sadpanda: - ne.SetID(resp.GetID()) - ne.SetOrgID(resp.GetOrgID()) + ne.SetID(resp.endpoint.GetID()) + ne.SetOrgID(resp.endpoint.GetOrgID()) return nil } // UpdateNotificationEndpoint updates a single notification endpoint. // Returns the new notification endpoint after update. -func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { - var resp notificationEndpointResponse +func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, ne influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { + // userID is ignored since userID is grabbed off the http auth set on the client + var resp notificationEndpointDecoder err := s.Client. - Put(httpc.BodyJSON(nr), prefixNotificationEndpoints, id.String()). + PutJSON(¬ificationEndpointEncoder{ne: ne}, prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). Do(ctx) if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // PatchNotificationEndpoint updates a single notification endpoint with changeset. @@ -693,18 +695,71 @@ func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Cont return nil, err } - var resp notificationEndpointResponse + var resp notificationEndpointDecoder err := s.Client. - Patch(httpc.BodyJSON(upd), prefixNotificationEndpoints, id.String()). + PatchJSON(upd, prefixNotificationEndpoints, id.String()). DecodeJSON(&resp). Do(ctx) if err != nil { return nil, err } - return resp.NotificationEndpoint, nil + return resp.endpoint, nil } // DeleteNotificationEndpoint removes a notification endpoint by ID, returns secret fields, orgID for further deletion. -func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) (flds []influxdb.SecretField, orgID influxdb.ID, err error) { - panic("not implemented") +// TODO: axe this delete design, makes little sense in how its currently being done. Right now, as an http client, +// I am forced to know how the store handles this and then figure out what the server does in between me and that store, +// then see what falls out :flushed... for now returning nothing for secrets, orgID, and only returning an error. This makes +// the code/design smell super obvious imo +func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + err := s.Client. + Delete(prefixNotificationEndpoints, id.String()). + Do(ctx) + return nil, 0, err +} + +type notificationEndpointEncoder struct { + ne influxdb.NotificationEndpoint +} + +func (n *notificationEndpointEncoder) MarshalJSON() ([]byte, error) { + b, err := json.Marshal(n.ne) + if err != nil { + return nil, err + } + + ughhh := make(map[string]interface{}) + if err := json.Unmarshal(b, &ughhh); err != nil { + return nil, err + } + n.ne.BackfillSecretKeys() + + // this makes me queezy and altogether sad + fieldMap := map[string]string{ + "-password": "password", + "-routing-key": "routingKey", + "-token": "token", + "-username": "username", + } + for _, sec := range n.ne.SecretFields() { + var v string + if sec.Value != nil { + v = *sec.Value + } + ughhh[fieldMap[sec.Key]] = v + } + return json.Marshal(ughhh) +} + +type notificationEndpointDecoder struct { + endpoint influxdb.NotificationEndpoint +} + +func (n *notificationEndpointDecoder) UnmarshalJSON(b []byte) error { + newEndpoint, err := endpoint.UnmarshalJSON(b) + if err != nil { + return err + } + n.endpoint = newEndpoint + return nil } diff --git a/http/org_service.go b/http/org_service.go index dd958db2ad9..93269018db8 100644 --- a/http/org_service.go +++ b/http/org_service.go @@ -693,7 +693,7 @@ func (s *OrganizationService) CreateOrganization(ctx context.Context, o *influxd } return s.Client. - Post(httpc.BodyJSON(o), organizationPath). + PostJSON(o, organizationPath). DecodeJSON(o). Do(ctx) } @@ -708,7 +708,7 @@ func (s *OrganizationService) UpdateOrganization(ctx context.Context, id influxd var o influxdb.Organization err := s.Client. - Patch(httpc.BodyJSON(upd), organizationPath, id.String()). + PatchJSON(upd, organizationPath, id.String()). DecodeJSON(&o). Do(ctx) if err != nil { diff --git a/http/swagger.yml b/http/swagger.yml index be38db08ea0..ebe173f6bd6 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -7240,6 +7240,17 @@ components: type: string labelID: type: string + notificationEndpoints: + type: array + items: + allOf: + - $ref: "#/components/schemas/NotificationEndpointDiscrimator" + - type: object + properties: + labelAssociations: + type: array + items: + $ref: "#/components/schemas/Label" telegrafConfigs: type: array items: @@ -7341,6 +7352,19 @@ components: type: string labelName: type: string + notificationEndpoints: + type: array + items: + type: object + properties: + id: + type: string + name: + type: string + new: + $ref: "#/components/schemas/NotificationEndpointDiscrimator" + old: + $ref: "#/components/schemas/NotificationEndpointDiscrimator" telegrafConfigs: type: array items: @@ -10341,6 +10365,7 @@ components: type: string NotificationEndpointUpdate: type: object + properties: name: type: string diff --git a/http/telegraf.go b/http/telegraf.go index aea95938396..25ade8cefe7 100644 --- a/http/telegraf.go +++ b/http/telegraf.go @@ -57,7 +57,7 @@ type TelegrafHandler struct { } const ( - prefixTelegrafs = "/api/v2/telegrafs" + prefixTelegraf = "/api/v2/telegrafs" telegrafsIDPath = "/api/v2/telegrafs/:id" telegrafsIDMembersPath = "/api/v2/telegrafs/:id/members" telegrafsIDMembersIDPath = "/api/v2/telegrafs/:id/members/:userID" @@ -66,8 +66,8 @@ const ( telegrafsIDLabelsPath = "/api/v2/telegrafs/:id/labels" telegrafsIDLabelsIDPath = "/api/v2/telegrafs/:id/labels/:lid" - prefixTelegraf = "/api/v2/telegraf" - telegrafPluginsPath = "/api/v2/telegraf/plugins" + prefixTelegrafPlugins = "/api/v2/telegraf" + telegrafPluginsPath = "/api/v2/telegraf/plugins" ) // NewTelegrafHandler returns a new instance of TelegrafHandler. @@ -83,8 +83,8 @@ func NewTelegrafHandler(log *zap.Logger, b *TelegrafBackend) *TelegrafHandler { UserService: b.UserService, OrganizationService: b.OrganizationService, } - h.HandlerFunc("POST", prefixTelegrafs, h.handlePostTelegraf) - h.HandlerFunc("GET", prefixTelegrafs, h.handleGetTelegrafs) + h.HandlerFunc("POST", prefixTelegraf, h.handlePostTelegraf) + h.HandlerFunc("GET", prefixTelegraf, h.handleGetTelegrafs) h.HandlerFunc("GET", telegrafsIDPath, h.handleGetTelegraf) h.HandlerFunc("DELETE", telegrafsIDPath, h.handleDeleteTelegraf) h.HandlerFunc("PUT", telegrafsIDPath, h.handlePutTelegraf) @@ -427,7 +427,7 @@ var _ platform.TelegrafConfigStore = (*TelegrafService)(nil) func (s *TelegrafService) FindTelegrafConfigByID(ctx context.Context, id platform.ID) (*platform.TelegrafConfig, error) { var cfg platform.TelegrafConfig err := s.client. - Get(prefixTelegrafs, id.String()). + Get(prefixTelegraf, id.String()). Header("Accept", "application/json"). DecodeJSON(&cfg). Do(ctx) @@ -458,7 +458,7 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f platform.Te Configs []*platform.TelegrafConfig `json:"configurations"` } err := s.client. - Get(prefixTelegrafs). + Get(prefixTelegraf). QueryParams(params...). DecodeJSON(&resp). Do(ctx) @@ -473,7 +473,7 @@ func (s *TelegrafService) FindTelegrafConfigs(ctx context.Context, f platform.Te func (s *TelegrafService) CreateTelegrafConfig(ctx context.Context, tc *platform.TelegrafConfig, userID platform.ID) error { var teleResp platform.TelegrafConfig err := s.client. - Post(httpc.BodyJSON(tc), prefixTelegrafs). + PostJSON(tc, prefixTelegraf). DecodeJSON(&teleResp). Do(ctx) if err != nil { @@ -492,6 +492,6 @@ func (s *TelegrafService) UpdateTelegrafConfig(ctx context.Context, id platform. // DeleteTelegrafConfig removes a telegraf config by ID. func (s *TelegrafService) DeleteTelegrafConfig(ctx context.Context, id platform.ID) error { return s.client. - Delete(prefixTelegrafs, id.String()). + Delete(prefixTelegraf, id.String()). Do(ctx) } diff --git a/http/user_resource_mapping_service.go b/http/user_resource_mapping_service.go index c7a7abed80a..b0914f50f16 100644 --- a/http/user_resource_mapping_service.go +++ b/http/user_resource_mapping_service.go @@ -298,7 +298,7 @@ func (s *UserResourceMappingService) CreateUserResourceMapping(ctx context.Conte urlPath := resourceIDPath(m.ResourceType, m.ResourceID, string(m.UserType)+"s") return s.Client. - Post(httpc.BodyJSON(influxdb.User{ID: m.UserID}), urlPath). + PostJSON(influxdb.User{ID: m.UserID}, urlPath). DecodeJSON(m). Do(ctx) } diff --git a/http/variable_service.go b/http/variable_service.go index 7374f9546a5..b5e6be2d6c7 100644 --- a/http/variable_service.go +++ b/http/variable_service.go @@ -491,7 +491,7 @@ func (s *VariableService) CreateVariable(ctx context.Context, m *platform.Variab } return s.Client. - Post(httpc.BodyJSON(m), prefixVariables). + PostJSON(m, prefixVariables). DecodeJSON(m). Do(ctx) } @@ -500,7 +500,7 @@ func (s *VariableService) CreateVariable(ctx context.Context, m *platform.Variab func (s *VariableService) UpdateVariable(ctx context.Context, id platform.ID, update *platform.VariableUpdate) (*platform.Variable, error) { var m platform.Variable err := s.Client. - Patch(httpc.BodyJSON(update), prefixVariables, id.String()). + PatchJSON(update, prefixVariables, id.String()). DecodeJSON(&m). Do(ctx) if err != nil { @@ -513,7 +513,7 @@ func (s *VariableService) UpdateVariable(ctx context.Context, id platform.ID, up // ReplaceVariable replaces a single variable func (s *VariableService) ReplaceVariable(ctx context.Context, variable *platform.Variable) error { return s.Client. - Put(httpc.BodyJSON(variable), prefixVariables, variable.ID.String()). + PutJSON(variable, prefixVariables, variable.ID.String()). DecodeJSON(variable). Do(ctx) } diff --git a/inmem/kv.go b/inmem/kv.go index da719fd11c0..287f7450377 100644 --- a/inmem/kv.go +++ b/inmem/kv.go @@ -29,10 +29,7 @@ func NewKVStore() *KVStore { func (s *KVStore) View(ctx context.Context, fn func(kv.Tx) error) error { s.mu.RLock() defer s.mu.RUnlock() - if s.buckets == nil { - s.buckets = map[string]*Bucket{} - s.ro = map[string]*bucket{} - } + return fn(&Tx{ kv: s, writable: false, @@ -44,10 +41,6 @@ func (s *KVStore) View(ctx context.Context, fn func(kv.Tx) error) error { func (s *KVStore) Update(ctx context.Context, fn func(kv.Tx) error) error { s.mu.Lock() defer s.mu.Unlock() - if s.buckets == nil { - s.buckets = map[string]*Bucket{} - s.ro = map[string]*bucket{} - } return fn(&Tx{ kv: s, diff --git a/inmem/kv_test.go b/inmem/kv_test.go index b8b503b1903..8aaa35bf9e7 100644 --- a/inmem/kv_test.go +++ b/inmem/kv_test.go @@ -61,7 +61,7 @@ func TestKVStore_Buckets(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &inmem.KVStore{} + s := inmem.NewKVStore() err := s.Update(context.Background(), func(tx kv.Tx) error { for _, b := range tt.buckets { if _, err := tx.Bucket([]byte(b)); err != nil { diff --git a/kv/task.go b/kv/task.go index 6cceb264be8..fe863a94b82 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1360,10 +1360,10 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, } // CreateRun creates a run with a scheduledFor time as now. -func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { - run, err := s.createRun(ctx, tx, taskID, scheduledFor) + run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt) if err != nil { return err } @@ -1372,13 +1372,14 @@ func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFo }) return r, err } -func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { id := s.IDGenerator.ID() run := influxdb.Run{ ID: id, TaskID: taskID, ScheduledFor: scheduledFor, + RunAt: runAt, Status: backend.RunScheduled.String(), Log: []influxdb.Log{}, } diff --git a/mock/notification_endpoint_service.go b/mock/notification_endpoint_service.go index d60d3f2939a..4a16d49f9ec 100644 --- a/mock/notification_endpoint_service.go +++ b/mock/notification_endpoint_service.go @@ -12,12 +12,18 @@ var _ influxdb.NotificationEndpointService = &NotificationEndpointService{} type NotificationEndpointService struct { *OrganizationService *UserResourceMappingService - FindNotificationEndpointByIDF func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) - FindNotificationEndpointsF func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) - CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error - UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) - PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) - DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) + FindNotificationEndpointByIDF func(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) + FindNotificationEndpointByIDCalls SafeCount + FindNotificationEndpointsF func(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) + FindNotificationEndpointsCalls SafeCount + CreateNotificationEndpointF func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error + CreateNotificationEndpointCalls SafeCount + UpdateNotificationEndpointF func(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) + UpdateNotificationEndpointCalls SafeCount + PatchNotificationEndpointF func(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) + PatchNotificationEndpointCalls SafeCount + DeleteNotificationEndpointF func(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) + DeleteNotificationEndpointCalls SafeCount } func NewNotificationEndpointService() *NotificationEndpointService { @@ -47,33 +53,39 @@ func NewNotificationEndpointService() *NotificationEndpointService { // FindNotificationEndpointByID returns a single telegraf config by ID. func (s *NotificationEndpointService) FindNotificationEndpointByID(ctx context.Context, id influxdb.ID) (influxdb.NotificationEndpoint, error) { + defer s.FindNotificationEndpointByIDCalls.IncrFn()() return s.FindNotificationEndpointByIDF(ctx, id) } // FindNotificationEndpoints returns a list of notification rules that match filter and the total count of matching notification rules. // Additional options provide pagination & sorting. func (s *NotificationEndpointService) FindNotificationEndpoints(ctx context.Context, filter influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) { + defer s.FindNotificationEndpointsCalls.IncrFn()() return s.FindNotificationEndpointsF(ctx, filter, opt...) } // CreateNotificationEndpoint creates a new notification rule and sets ID with the new identifier. func (s *NotificationEndpointService) CreateNotificationEndpoint(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { + defer s.CreateNotificationEndpointCalls.IncrFn()() return s.CreateNotificationEndpointF(ctx, nr, userID) } // UpdateNotificationEndpoint updates a single notification rule. // Returns the new notification rule after update. func (s *NotificationEndpointService) UpdateNotificationEndpoint(ctx context.Context, id influxdb.ID, nr influxdb.NotificationEndpoint, userID influxdb.ID) (influxdb.NotificationEndpoint, error) { + defer s.UpdateNotificationEndpointCalls.IncrFn()() return s.UpdateNotificationEndpointF(ctx, id, nr, userID) } // PatchNotificationEndpoint updates a single notification rule with changeset. // Returns the new notification rule after update. func (s *NotificationEndpointService) PatchNotificationEndpoint(ctx context.Context, id influxdb.ID, upd influxdb.NotificationEndpointUpdate) (influxdb.NotificationEndpoint, error) { + defer s.PatchNotificationEndpointCalls.IncrFn()() return s.PatchNotificationEndpointF(ctx, id, upd) } // DeleteNotificationEndpoint removes a notification rule by ID. func (s *NotificationEndpointService) DeleteNotificationEndpoint(ctx context.Context, id influxdb.ID) ([]influxdb.SecretField, influxdb.ID, error) { + defer s.DeleteNotificationEndpointCalls.IncrFn()() return s.DeleteNotificationEndpointF(ctx, id) } diff --git a/mock/task_service.go b/mock/task_service.go index 5586e60f6bd..42f14404321 100644 --- a/mock/task_service.go +++ b/mock/task_service.go @@ -72,7 +72,7 @@ func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, schedule type TaskControlService struct { CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error) - CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) + CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) @@ -87,8 +87,8 @@ func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxd func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) { return tcs.NextDueRunFn(ctx, taskID) } -func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { - return tcs.CreateRunFn(ctx, taskID, scheduledFor) +func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { + return tcs.CreateRunFn(ctx, taskID, scheduledFor, runAt) } func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { return tcs.CurrentlyRunningFn(ctx, taskID) diff --git a/notification/endpoint/http.go b/notification/endpoint/http.go index bfb530cf8f6..e400342df13 100644 --- a/notification/endpoint/http.go +++ b/notification/endpoint/http.go @@ -121,10 +121,9 @@ func (s HTTP) Valid() error { return nil } -type httpAlias HTTP - // MarshalJSON implement json.Marshaler interface. func (s HTTP) MarshalJSON() ([]byte, error) { + type httpAlias HTTP return json.Marshal( struct { httpAlias diff --git a/pkg/httpc/client.go b/pkg/httpc/client.go index daf5cce837d..91e502e153c 100644 --- a/pkg/httpc/client.go +++ b/pkg/httpc/client.go @@ -90,16 +90,35 @@ func (c *Client) Patch(bFn BodyFn, urlPath string, rest ...string) *Req { return c.Req(http.MethodPatch, bFn, urlPath, rest...) } +// PatchJSON generates a PATCH request. This is to be used with value or pointer to value type. +// Providing a stream/reader will result in disappointment. +func (c *Client) PatchJSON(v interface{}, urlPath string, rest ...string) *Req { + return c.Patch(BodyJSON(v), urlPath, rest...) +} + // Post generates a POST request. func (c *Client) Post(bFn BodyFn, urlPath string, rest ...string) *Req { return c.Req(http.MethodPost, bFn, urlPath, rest...) } +// PostJSON generates a POST request and json encodes the body. This is to be +// used with value or pointer to value type. Providing a stream/reader will result +// in disappointment. +func (c *Client) PostJSON(v interface{}, urlPath string, rest ...string) *Req { + return c.Post(BodyJSON(v), urlPath, rest...) +} + // Put generates a PUT request. func (c *Client) Put(bFn BodyFn, urlPath string, rest ...string) *Req { return c.Req(http.MethodPut, bFn, urlPath, rest...) } +// PutJSON generates a PUT request. This is to be used with value or pointer to value type. +// Providing a stream/reader will result in disappointment. +func (c *Client) PutJSON(v interface{}, urlPath string, rest ...string) *Req { + return c.Put(BodyJSON(v), urlPath, rest...) +} + // Req constructs a request. func (c *Client) Req(method string, bFn BodyFn, urlPath string, rest ...string) *Req { bodyF := BodyEmpty diff --git a/pkg/httpc/client_test.go b/pkg/httpc/client_test.go index c6c60aca8e4..384a49d9e25 100644 --- a/pkg/httpc/client_test.go +++ b/pkg/httpc/client_test.go @@ -116,8 +116,11 @@ func TestClient(t *testing.T) { for _, encTest := range encodingTests { t.Run(encTest.name, func(t *testing.T) { + t.Helper() + for _, authTest := range authTests { fn := func(t *testing.T) { + t.Helper() client, fakeDoer := authTest.clientFn(tt.status, encTest.respFn, tt.clientOpts...) req := tt.reqFn(client, "/new/path/heres", tt.reqBody). @@ -283,6 +286,61 @@ func TestClient(t *testing.T) { }) } }) + + t.Run("PatchJSON PostJSON PutJSON with request bodies", func(t *testing.T) { + methods := []struct { + name string + methodCallFn func(client *Client, urlPath string, v interface{}) *Req + }{ + { + name: "PATCH", + methodCallFn: func(client *Client, urlPath string, v interface{}) *Req { + return client.PatchJSON(v, urlPath) + }, + }, + { + name: "POST", + methodCallFn: func(client *Client, urlPath string, v interface{}) *Req { + return client.PostJSON(v, urlPath) + }, + }, + { + name: "PUT", + methodCallFn: func(client *Client, urlPath string, v interface{}) *Req { + return client.PutJSON(v, urlPath) + }, + }, + } + + for _, method := range methods { + t.Run(method.name, func(t *testing.T) { + tests := []struct { + name string + testCase + }{ + { + name: "handles json req body", + testCase: testCase{ + status: 200, + reqFn: func(client *Client, urlPath string, body reqBody) *Req { + return method.methodCallFn(client, urlPath, body) + }, + reqBody: reqBody{ + Foo: "foo 1", + Bar: 31, + }, + }, + }, + } + + for _, tt := range tests { + tt.method = method.name + + t.Run(tt.name, testWithRespBody(tt.testCase)) + } + }) + } + }) } type fakeDoer struct { diff --git a/pkger/models.go b/pkger/models.go index a2474aedf57..a59cffdb9c8 100644 --- a/pkger/models.go +++ b/pkger/models.go @@ -19,6 +19,7 @@ const ( KindBucket Kind = "bucket" KindDashboard Kind = "dashboard" KindLabel Kind = "label" + KindNotificationEndpoint Kind = "notificationendpoint" KindNotificationEndpointPagerDuty Kind = "notificationendpointpagerduty" KindNotificationEndpointHTTP Kind = "notificationendpointhttp" KindNotificationEndpointSlack Kind = "notificationendpointslack" @@ -44,7 +45,7 @@ type Kind string // NewKind returns the kind parsed from the provided string. func NewKind(s string) Kind { - return Kind(strings.TrimSpace(strings.ToLower(s))) + return Kind(normStr(s)) } // String provides the kind in human readable form. @@ -79,7 +80,8 @@ func (k Kind) ResourceType() influxdb.ResourceType { return influxdb.DashboardsResourceType case KindLabel: return influxdb.LabelsResourceType - case KindNotificationEndpointHTTP, + case KindNotificationEndpoint, + KindNotificationEndpointHTTP, KindNotificationEndpointPagerDuty, KindNotificationEndpointSlack: return influxdb.NotificationEndpointResourceType @@ -133,12 +135,13 @@ type Metadata struct { // Diff is the result of a service DryRun call. The diff outlines // what is new and or updated from the current state of the platform. type Diff struct { - Buckets []DiffBucket `json:"buckets"` - Dashboards []DiffDashboard `json:"dashboards"` - Labels []DiffLabel `json:"labels"` - LabelMappings []DiffLabelMapping `json:"labelMappings"` - Telegrafs []DiffTelegraf `json:"telegrafConfigs"` - Variables []DiffVariable `json:"variables"` + Buckets []DiffBucket `json:"buckets"` + Dashboards []DiffDashboard `json:"dashboards"` + Labels []DiffLabel `json:"labels"` + LabelMappings []DiffLabelMapping `json:"labelMappings"` + NotificationEndpoints []DiffNotificationEndpoint `json:"notificationEndpoints"` + Telegrafs []DiffTelegraf `json:"telegrafConfigs"` + Variables []DiffVariable `json:"variables"` } // HasConflicts provides a binary t/f if there are any changes within package @@ -291,6 +294,41 @@ type DiffLabelMapping struct { LabelName string `json:"labelName"` } +// DiffNotificationEndpointValues are the varying values for a notification endpoint. +type DiffNotificationEndpointValues struct { + influxdb.NotificationEndpoint +} + +// DiffNotificationEndpoint is a diff of an individual notification endpoint. +type DiffNotificationEndpoint struct { + ID SafeID `json:"id"` + Name string `json:"name"` + New DiffNotificationEndpointValues `json:"new"` + Old *DiffNotificationEndpointValues `json:"old,omitempty"` // using omitempty here to signal there was no prev state with a nil +} + +func newDiffNotificationEndpoint(ne *notificationEndpoint, i influxdb.NotificationEndpoint) DiffNotificationEndpoint { + diff := DiffNotificationEndpoint{ + Name: ne.Name(), + New: DiffNotificationEndpointValues{ + NotificationEndpoint: ne.summarize().NotificationEndpoint, + }, + } + if i != nil { + diff.ID = SafeID(i.GetID()) + diff.Old = &DiffNotificationEndpointValues{ + NotificationEndpoint: i, + } + } + return diff +} + +// IsNew indicates if the resource will be new to the platform or if it edits +// an existing resource. +func (d DiffNotificationEndpoint) IsNew() bool { + return d.Old == nil +} + // DiffTelegraf is a diff of an individual telegraf. type DiffTelegraf struct { influxdb.TelegrafConfig @@ -488,6 +526,10 @@ func (b *bucket) ID() influxdb.ID { return b.id } +func (b *bucket) Labels() []*label { + return b.labels +} + func (b *bucket) Name() string { return b.name } @@ -524,6 +566,16 @@ func (b *bucket) shouldApply() bool { b.RetentionRules.RP() != b.existing.RetentionPeriod } +type mapperBuckets []*bucket + +func (b mapperBuckets) Association(i int) labelAssociater { + return b[i] +} + +func (b mapperBuckets) Len() int { + return len(b) +} + const ( retentionRuleTypeExpire = "expire" ) @@ -714,15 +766,19 @@ func (l *label) properties() map[string]string { } } +func (l *label) toInfluxLabel() influxdb.Label { + return influxdb.Label{ + ID: l.ID(), + OrgID: l.OrgID, + Name: l.Name(), + Properties: l.properties(), + } +} + func toInfluxLabels(labels ...*label) []influxdb.Label { var iLabels []influxdb.Label for _, l := range labels { - iLabels = append(iLabels, influxdb.Label{ - ID: l.ID(), - OrgID: l.OrgID, - Name: l.Name(), - Properties: l.properties(), - }) + iLabels = append(iLabels, l.toInfluxLabel()) } return iLabels } @@ -756,6 +812,7 @@ const ( ) const ( + fieldNotificationEndpointHTTPMethod = "method" fieldNotificationEndpointPassword = "password" fieldNotificationEndpointRoutingKey = "routingKey" fieldNotificationEndpointToken = "token" @@ -765,8 +822,11 @@ const ( type notificationEndpoint struct { kind notificationKind + id influxdb.ID + OrgID influxdb.ID name string description string + method string password string routingKey string status string @@ -776,6 +836,23 @@ type notificationEndpoint struct { username string labels sortedLabels + + existing influxdb.NotificationEndpoint +} + +func (n *notificationEndpoint) Exists() bool { + return n.existing != nil +} + +func (n *notificationEndpoint) ID() influxdb.ID { + if n.existing != nil { + return n.existing.GetID() + } + return n.id +} + +func (n *notificationEndpoint) Labels() []*label { + return n.labels } func (n *notificationEndpoint) Name() string { @@ -786,48 +863,74 @@ func (n *notificationEndpoint) ResourceType() influxdb.ResourceType { return KindNotificationEndpointSlack.ResourceType() } -func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { - base := endpoint.Base{ +func (n *notificationEndpoint) base() endpoint.Base { + return endpoint.Base{ + ID: n.ID(), + OrgID: n.OrgID, Name: n.Name(), Description: n.description, Status: influxdb.TaskStatusActive, } +} + +func (n *notificationEndpoint) summarize() SummaryNotificationEndpoint { + base := n.base() if n.status != "" { base.Status = influxdb.Status(n.status) } sum := SummaryNotificationEndpoint{ LabelAssociations: toInfluxLabels(n.labels...), } + switch n.kind { case notificationKindHTTP: e := &endpoint.HTTP{ Base: base, URL: n.url, - Method: "POST", + Method: n.method, } - switch { - case n.password == "" && n.username == "" && n.token == "": + switch n.httpType { + case notificationHTTPAuthTypeNone: e.AuthMethod = notificationHTTPAuthTypeNone - case n.token != "": + case notificationHTTPAuthTypeBearer: e.AuthMethod = notificationHTTPAuthTypeBearer + e.Token = influxdb.SecretField{Value: &n.token} default: e.AuthMethod = notificationHTTPAuthTypeBasic + e.Password = influxdb.SecretField{Value: &n.password} + e.Username = influxdb.SecretField{Value: &n.username} } sum.NotificationEndpoint = e case notificationKindPagerDuty: sum.NotificationEndpoint = &endpoint.PagerDuty{ - Base: base, - ClientURL: n.url, + Base: base, + ClientURL: n.url, + RoutingKey: influxdb.SecretField{Value: &n.routingKey}, } case notificationKindSlack: - sum.NotificationEndpoint = &endpoint.Slack{ + e := &endpoint.Slack{ Base: base, URL: n.url, } + if n.token != "" { + e.Token = influxdb.SecretField{Value: &n.token} + } + sum.NotificationEndpoint = e } + sum.NotificationEndpoint.BackfillSecretKeys() return sum } +var validHTTPMethods = map[string]bool{ + "DELETE": true, + "GET": true, + "HEAD": true, + "OPTIONS": true, + "PATCH": true, + "POST": true, + "PUT": true, +} + func (n *notificationEndpoint) valid() []validationErr { var failures []validationErr if _, err := url.Parse(n.url); err != nil || n.url == "" { @@ -853,6 +956,13 @@ func (n *notificationEndpoint) valid() []validationErr { }) } case notificationKindHTTP: + if !validHTTPMethods[n.method] { + failures = append(failures, validationErr{ + Field: fieldNotificationEndpointHTTPMethod, + Msg: "http method must be a valid HTTP verb", + }) + } + switch n.httpType { case notificationHTTPAuthTypeBasic: if n.password == "" { @@ -891,6 +1001,16 @@ func (n *notificationEndpoint) valid() []validationErr { return failures } +type mapperNotificationEndpoints []*notificationEndpoint + +func (n mapperNotificationEndpoints) Association(i int) labelAssociater { + return n[i] +} + +func (n mapperNotificationEndpoints) Len() int { + return len(n) +} + const ( fieldTelegrafConfig = "config" ) @@ -905,6 +1025,10 @@ func (t *telegraf) ID() influxdb.ID { return t.config.ID } +func (t *telegraf) Labels() []*label { + return t.labels +} + func (t *telegraf) Name() string { return t.config.Name } @@ -924,6 +1048,16 @@ func (t *telegraf) summarize() SummaryTelegraf { } } +type mapperTelegrafs []*telegraf + +func (m mapperTelegrafs) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperTelegrafs) Len() int { + return len(m) +} + const ( fieldArgTypeConstant = "constant" fieldArgTypeMap = "map" @@ -957,6 +1091,10 @@ func (v *variable) Exists() bool { return v.existing != nil } +func (v *variable) Labels() []*label { + return v.labels +} + func (v *variable) Name() string { return v.name } @@ -969,7 +1107,7 @@ func (v *variable) shouldApply() bool { return v.existing == nil || v.existing.Description != v.Description || v.existing.Arguments == nil || - v.existing.Arguments.Type != v.Type + !reflect.DeepEqual(v.existing.Arguments, v.influxVarArgs()) } func (v *variable) summarize() SummaryVariable { @@ -1037,6 +1175,16 @@ func (v *variable) valid() []validationErr { return failures } +type mapperVariables []*variable + +func (m mapperVariables) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperVariables) Len() int { + return len(m) +} + const ( fieldDashCharts = "charts" ) @@ -1055,6 +1203,10 @@ func (d *dashboard) ID() influxdb.ID { return d.id } +func (d *dashboard) Labels() []*label { + return d.labels +} + func (d *dashboard) Name() string { return d.name } @@ -1087,6 +1239,16 @@ func (d *dashboard) summarize() SummaryDashboard { return iDash } +type mapperDashboards []*dashboard + +func (m mapperDashboards) Association(i int) labelAssociater { + return m[i] +} + +func (m mapperDashboards) Len() int { + return len(m) +} + const ( fieldChartAxes = "axes" fieldChartBinCount = "binCount" diff --git a/pkger/parser.go b/pkger/parser.go index 210db792a59..b9673b42bc5 100644 --- a/pkger/parser.go +++ b/pkger/parser.go @@ -548,10 +548,11 @@ func (p *Pkg) graphNotificationEndpoints() *parseErr { kind: nk.notificationKind, name: r.Name(), description: r.stringShort(fieldDescription), - httpType: strings.ToLower(r.stringShort(fieldType)), + method: strings.TrimSpace(strings.ToUpper(r.stringShort(fieldNotificationEndpointHTTPMethod))), + httpType: normStr(r.stringShort(fieldType)), password: r.stringShort(fieldNotificationEndpointPassword), routingKey: r.stringShort(fieldNotificationEndpointRoutingKey), - status: strings.ToLower(r.stringShort(fieldStatus)), + status: normStr(r.stringShort(fieldStatus)), token: r.stringShort(fieldNotificationEndpointToken), url: r.stringShort(fieldNotificationEndpointURL), username: r.stringShort(fieldNotificationEndpointUsername), @@ -589,9 +590,9 @@ func (p *Pkg) graphVariables() *parseErr { newVar := &variable{ name: r.Name(), Description: r.stringShort(fieldDescription), - Type: strings.ToLower(r.stringShort(fieldType)), + Type: normStr(r.stringShort(fieldType)), Query: strings.TrimSpace(r.stringShort(fieldQuery)), - Language: strings.ToLower(strings.TrimSpace(r.stringShort(fieldLanguage))), + Language: normStr(r.stringShort(fieldLanguage)), ConstValues: r.slcStr(fieldValues), MapValues: r.mapStrStr(fieldValues), } @@ -1225,3 +1226,7 @@ func IsParseErr(err error) bool { _, ok := err.(*parseErr) return ok } + +func normStr(s string) string { + return strings.TrimSpace(strings.ToLower(s)) +} diff --git a/pkger/parser_test.go b/pkger/parser_test.go index 65cafa08cda..3b471940e6a 100644 --- a/pkger/parser_test.go +++ b/pkger/parser_test.go @@ -2717,6 +2717,8 @@ spec: URL: "https://www.example.com/endpoint/basicauth", AuthMethod: "basic", Method: "POST", + Username: influxdb.SecretField{Key: "-username", Value: strPtr("secret username")}, + Password: influxdb.SecretField{Key: "-password", Value: strPtr("secret password")}, }, }, { @@ -2728,7 +2730,8 @@ spec: }, URL: "https://www.example.com/endpoint/bearerauth", AuthMethod: "bearer", - Method: "POST", + Method: "PUT", + Token: influxdb.SecretField{Key: "-token", Value: strPtr("secret token")}, }, }, { @@ -2740,7 +2743,7 @@ spec: }, URL: "https://www.example.com/endpoint/noneauth", AuthMethod: "none", - Method: "POST", + Method: "GET", }, }, { @@ -2750,7 +2753,8 @@ spec: Description: "pager duty desc", Status: influxdb.TaskStatusActive, }, - ClientURL: "http://localhost:8080/orgs/7167eb6719fa34e5/alert-history", + ClientURL: "http://localhost:8080/orgs/7167eb6719fa34e5/alert-history", + RoutingKey: influxdb.SecretField{Key: "-routing-key", Value: strPtr("secret routing-key")}, }, }, { @@ -2760,7 +2764,8 @@ spec: Description: "slack desc", Status: influxdb.TaskStatusActive, }, - URL: "https://hooks.slack.com/services/bip/piddy/boppidy", + URL: "https://hooks.slack.com/services/bip/piddy/boppidy", + Token: influxdb.SecretField{Key: "-token", Value: strPtr("tokenval")}, }, }, } @@ -2846,6 +2851,7 @@ spec: resources: - kind: NotificationEndpointHTTP name: name1 + method: GET `, }, }, @@ -2866,6 +2872,7 @@ spec: - kind: NotificationEndpointHTTP name: name1 type: none + method: POST url: d_____-_8**(*https://www.examples.coms `, }, @@ -2873,9 +2880,9 @@ spec: { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "bad url", + name: "missing http method", validationErrs: 1, - valFields: []string{fieldNotificationEndpointURL}, + valFields: []string{fieldNotificationEndpointHTTPMethod}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2887,16 +2894,16 @@ spec: - kind: NotificationEndpointHTTP name: name1 type: none - url: d_____-_8**(*https://www.examples.coms + url: http://example.com `, }, }, { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "missing basic username", + name: "invalid http method", validationErrs: 1, - valFields: []string{fieldNotificationEndpointUsername}, + valFields: []string{fieldNotificationEndpointHTTPMethod}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2907,18 +2914,18 @@ spec: resources: - kind: NotificationEndpointHTTP name: name1 - type: basic - url: example.com - password: password + type: none + method: GUT + url: http://example.com `, }, }, { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "missing basic password", + name: "missing basic username", validationErrs: 1, - valFields: []string{fieldNotificationEndpointPassword}, + valFields: []string{fieldNotificationEndpointUsername}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2931,16 +2938,17 @@ spec: name: name1 type: basic url: example.com - username: user + method: POST + password: password `, }, }, { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "missing basic password and username", + name: "missing basic password", validationErrs: 1, - valFields: []string{fieldNotificationEndpointPassword, fieldNotificationEndpointUsername}, + valFields: []string{fieldNotificationEndpointPassword}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2952,16 +2960,18 @@ spec: - kind: NotificationEndpointHTTP name: name1 type: basic + method: POST url: example.com + username: user `, }, }, { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "missing bearer token", + name: "missing basic password and username", validationErrs: 1, - valFields: []string{fieldNotificationEndpointToken}, + valFields: []string{fieldNotificationEndpointPassword, fieldNotificationEndpointUsername}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2972,7 +2982,8 @@ spec: resources: - kind: NotificationEndpointHTTP name: name1 - type: bearer + type: basic + method: POST url: example.com `, }, @@ -2980,9 +2991,9 @@ spec: { kind: KindNotificationEndpointHTTP, resErr: testPkgResourceError{ - name: "invalid http type", + name: "missing bearer token", validationErrs: 1, - valFields: []string{fieldType}, + valFields: []string{fieldNotificationEndpointToken}, pkgStr: `apiVersion: 0.1.0 kind: Package meta: @@ -2993,7 +3004,8 @@ spec: resources: - kind: NotificationEndpointHTTP name: name1 - type: threeve + type: bearer + method: GET url: example.com `, }, @@ -3015,6 +3027,7 @@ spec: - kind: NotificationEndpointHTTP name: name1 type: threeve + method: GET url: example.com `, }, @@ -3609,3 +3622,7 @@ func testfileRunner(t *testing.T, path string, testFn func(t *testing.T, pkg *Pk t.Run(tt.name, fn) } } + +func strPtr(s string) *string { + return &s +} diff --git a/pkger/service.go b/pkger/service.go index 3daaf4ba468..a58b1576a6e 100644 --- a/pkger/service.go +++ b/pkger/service.go @@ -26,12 +26,13 @@ type SVC interface { } type serviceOpt struct { - logger *zap.Logger - labelSVC influxdb.LabelService - bucketSVC influxdb.BucketService - dashSVC influxdb.DashboardService - teleSVC influxdb.TelegrafConfigStore - varSVC influxdb.VariableService + logger *zap.Logger + labelSVC influxdb.LabelService + bucketSVC influxdb.BucketService + dashSVC influxdb.DashboardService + endpointSVC influxdb.NotificationEndpointService + teleSVC influxdb.TelegrafConfigStore + varSVC influxdb.VariableService applyReqLimit int } @@ -60,6 +61,13 @@ func WithDashboardSVC(dashSVC influxdb.DashboardService) ServiceSetterFn { } } +// WithNoticationEndpointSVC sets the endpoint notification service. +func WithNoticationEndpointSVC(endpointSVC influxdb.NotificationEndpointService) ServiceSetterFn { + return func(opt *serviceOpt) { + opt.endpointSVC = endpointSVC + } +} + // WithLabelSVC sets the label service. func WithLabelSVC(labelSVC influxdb.LabelService) ServiceSetterFn { return func(opt *serviceOpt) { @@ -95,11 +103,12 @@ func WithApplyReqLimit(limit int) ServiceSetterFn { type Service struct { log *zap.Logger - labelSVC influxdb.LabelService - bucketSVC influxdb.BucketService - dashSVC influxdb.DashboardService - teleSVC influxdb.TelegrafConfigStore - varSVC influxdb.VariableService + labelSVC influxdb.LabelService + bucketSVC influxdb.BucketService + dashSVC influxdb.DashboardService + endpointSVC influxdb.NotificationEndpointService + teleSVC influxdb.TelegrafConfigStore + varSVC influxdb.VariableService applyReqLimit int } @@ -119,6 +128,7 @@ func NewService(opts ...ServiceSetterFn) *Service { bucketSVC: opt.bucketSVC, labelSVC: opt.labelSVC, dashSVC: opt.dashSVC, + endpointSVC: opt.endpointSVC, teleSVC: opt.teleSVC, varSVC: opt.varSVC, applyReqLimit: opt.applyReqLimit, @@ -501,6 +511,11 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ return Summary{}, Diff{}, err } + diffEndpoints, err := s.dryRunNotificationEndpoints(ctx, orgID, pkg) + if err != nil { + return Summary{}, Diff{}, err + } + diffVars, err := s.dryRunVariables(ctx, orgID, pkg) if err != nil { return Summary{}, Diff{}, err @@ -517,12 +532,13 @@ func (s *Service) DryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (Summ pkg.isVerified = true diff := Diff{ - Buckets: diffBuckets, - Dashboards: s.dryRunDashboards(ctx, orgID, pkg), - Labels: diffLabels, - LabelMappings: diffLabelMappings, - Telegrafs: s.dryRunTelegraf(ctx, orgID, pkg), - Variables: diffVars, + Buckets: diffBuckets, + Dashboards: s.dryRunDashboards(pkg), + Labels: diffLabels, + LabelMappings: diffLabelMappings, + NotificationEndpoints: diffEndpoints, + Telegrafs: s.dryRunTelegraf(pkg), + Variables: diffVars, } return pkg.Summary(), diff, parseErr } @@ -555,7 +571,7 @@ func (s *Service) dryRunBuckets(ctx context.Context, orgID influxdb.ID, pkg *Pkg return diffs, nil } -func (s *Service) dryRunDashboards(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffDashboard { +func (s *Service) dryRunDashboards(pkg *Pkg) []DiffDashboard { var diffs []DiffDashboard for _, d := range pkg.dashboards() { diffs = append(diffs, newDiffDashboard(d)) @@ -595,7 +611,45 @@ func (s *Service) dryRunLabels(ctx context.Context, orgID influxdb.ID, pkg *Pkg) return diffs, nil } -func (s *Service) dryRunTelegraf(ctx context.Context, orgID influxdb.ID, pkg *Pkg) []DiffTelegraf { +func (s *Service) dryRunNotificationEndpoints(ctx context.Context, orgID influxdb.ID, pkg *Pkg) ([]DiffNotificationEndpoint, error) { + existingEndpoints, _, err := s.endpointSVC.FindNotificationEndpoints(ctx, influxdb.NotificationEndpointFilter{ + OrgID: &orgID, + }) // grab em all + if err != nil { + return nil, err + } + + mExisting := make(map[string]influxdb.NotificationEndpoint) + for i := range existingEndpoints { + e := existingEndpoints[i] + mExisting[e.GetName()] = e + } + + mExistingToNew := make(map[string]DiffNotificationEndpoint) + endpoints := pkg.notificationEndpoints() + for i := range endpoints { + newEndpoint := endpoints[i] + + var existing influxdb.NotificationEndpoint + if iExisting, ok := mExisting[newEndpoint.Name()]; ok { + newEndpoint.existing = iExisting + existing = iExisting + } + mExistingToNew[newEndpoint.Name()] = newDiffNotificationEndpoint(newEndpoint, existing) + } + + var diffs []DiffNotificationEndpoint + for _, diff := range mExistingToNew { + diffs = append(diffs, diff) + } + sort.Slice(diffs, func(i, j int) bool { + return diffs[i].Name < diffs[j].Name + }) + + return diffs, nil +} + +func (s *Service) dryRunTelegraf(pkg *Pkg) []DiffTelegraf { var diffs []DiffTelegraf for _, t := range pkg.telegrafs() { diffs = append(diffs, newDiffTelegraf(t)) @@ -649,82 +703,47 @@ VarLoop: type ( labelMappingDiffFn func(labelID influxdb.ID, labelName string, isNew bool) + labelMappers interface { + Association(i int) labelAssociater + Len() int + } + labelAssociater interface { ID() influxdb.ID + Name() string + Labels() []*label ResourceType() influxdb.ResourceType Exists() bool } ) func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabelMapping, error) { - var diffs []DiffLabelMapping - for _, b := range pkg.buckets() { - err := s.dryRunResourceLabelMapping(ctx, b, b.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - if l, ok := pkg.mLabels[labelName]; ok { - l.setMapping(b, !isNew) - } - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: b.ResourceType(), - ResID: SafeID(b.ID()), - ResName: b.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } - } - - for _, d := range pkg.dashboards() { - err := s.dryRunResourceLabelMapping(ctx, d, d.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(d, false) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: d.ResourceType(), - ResID: SafeID(d.ID()), - ResName: d.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } - } - - for _, t := range pkg.telegrafs() { - err := s.dryRunResourceLabelMapping(ctx, t, t.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(t, false) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: t.ResourceType(), - ResID: SafeID(t.ID()), - ResName: t.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, - }) - }) - if err != nil { - return nil, err - } + mappers := []labelMappers{ + mapperBuckets(pkg.buckets()), + mapperDashboards(pkg.mDashboards), + mapperNotificationEndpoints(pkg.notificationEndpoints()), + mapperTelegrafs(pkg.mTelegrafs), + mapperVariables(pkg.variables()), } - for _, v := range pkg.variables() { - err := s.dryRunResourceLabelMapping(ctx, v, v.labels, func(labelID influxdb.ID, labelName string, isNew bool) { - pkg.mLabels[labelName].setMapping(v, !isNew) - diffs = append(diffs, DiffLabelMapping{ - IsNew: isNew, - ResType: v.ResourceType(), - ResID: SafeID(v.ID()), - ResName: v.Name(), - LabelID: SafeID(labelID), - LabelName: labelName, + var diffs []DiffLabelMapping + for _, mapper := range mappers { + for i := 0; i < mapper.Len(); i++ { + la := mapper.Association(i) + err := s.dryRunResourceLabelMapping(ctx, la, func(labelID influxdb.ID, labelName string, isNew bool) { + pkg.mLabels[labelName].setMapping(la, !isNew) + diffs = append(diffs, DiffLabelMapping{ + IsNew: isNew, + ResType: la.ResourceType(), + ResID: SafeID(la.ID()), + ResName: la.Name(), + LabelID: SafeID(labelID), + LabelName: labelName, + }) }) - }) - if err != nil { - return nil, err + if err != nil { + return nil, err + } } } @@ -749,9 +768,9 @@ func (s *Service) dryRunLabelMappings(ctx context.Context, pkg *Pkg) ([]DiffLabe return diffs, nil } -func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, labels []*label, mappingFn labelMappingDiffFn) error { +func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssociater, mappingFn labelMappingDiffFn) error { if !la.Exists() { - for _, l := range labels { + for _, l := range la.Labels() { mappingFn(l.ID(), l.Name(), true) } return nil @@ -770,7 +789,7 @@ func (s *Service) dryRunResourceLabelMapping(ctx context.Context, la labelAssoci return err } - pkgLabels := labelSlcToMap(labels) + pkgLabels := labelSlcToMap(la.Labels()) for _, l := range existingLabels { // should ignore any labels that are not specified in pkg mappingFn(l.ID, l.Name, false) @@ -806,30 +825,39 @@ func (s *Service) Apply(ctx context.Context, orgID influxdb.ID, pkg *Pkg) (sum S // each grouping here runs for its entirety, then returns an error that // is indicative of running all appliers provided. For instance, the labels - // may have 1 label fail and one of the buckets fails. The errors aggregate so - // the caller will be informed of both the failed label and the failed bucket. + // may have 1 variable fail and one of the buckets fails. The errors aggregate so + // the caller will be informed of both the failed label variable the failed bucket. // the groupings here allow for steps to occur before exiting. The first step is - // adding the primary resources. Here we get all the errors associated with them. + // adding the dependencies, resources that are associated by other resources. Then the + // primary resources. Here we get all the errors associated with them. // If those are all good, then we run the secondary(dependent) resources which // rely on the primary resources having been created. - primary := []applier{ - // primary resources - s.applyLabels(pkg.labels()), - s.applyVariables(pkg.variables()), - s.applyBuckets(pkg.buckets()), - s.applyDashboards(pkg.dashboards()), - s.applyTelegrafs(pkg.telegrafs()), - } - if err := coordinator.runTilEnd(ctx, orgID, primary...); err != nil { - return Summary{}, err + appliers := [][]applier{ + // want to make all dependencies for belwo donezo before moving on to resources + // that have dependencies on lables + { + // deps for primary resources + s.applyLabels(pkg.labels()), + }, + { + // primary resources + s.applyVariables(pkg.variables()), + s.applyBuckets(pkg.buckets()), + s.applyDashboards(pkg.dashboards()), + s.applyNotificationEndpoints(pkg.notificationEndpoints()), + s.applyTelegrafs(pkg.telegrafs()), + }, } - // secondary grouping relies on state being available from the primary run. - // the first example here is label mappings which relies on ids provided - // from the newly created resources in primary. - secondary := []applier{ - s.applyLabelMappings(pkg.labelMappings()), + for _, group := range appliers { + if err := coordinator.runTilEnd(ctx, orgID, group...); err != nil { + return Summary{}, err + } } + + // secondary resources + // this last grouping relies on the above 2 steps having completely successfully + secondary := []applier{s.applyLabelMappings(pkg.labelMappings())} if err := coordinator.runTilEnd(ctx, orgID, secondary...); err != nil { return Summary{}, err } @@ -1098,11 +1126,7 @@ func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, erro return *updatedlabel, nil } - influxLabel := influxdb.Label{ - OrgID: l.OrgID, - Name: l.Name(), - Properties: l.properties(), - } + influxLabel := l.toInfluxLabel() err := s.labelSVC.CreateLabel(ctx, &influxLabel) if err != nil { return influxdb.Label{}, err @@ -1111,6 +1135,94 @@ func (s *Service) applyLabel(ctx context.Context, l label) (influxdb.Label, erro return influxLabel, nil } +func (s *Service) applyNotificationEndpoints(endpoints []*notificationEndpoint) applier { + const resource = "notification_endpoints" + + mutex := new(doMutex) + rollbackEndpoints := make([]*notificationEndpoint, 0, len(endpoints)) + + createFn := func(ctx context.Context, i int, orgID influxdb.ID) *applyErrBody { + var endpoint notificationEndpoint + mutex.Do(func() { + endpoints[i].OrgID = orgID + endpoint = *endpoints[i] + }) + + influxEndpoint, err := s.applyNotificationEndpoint(ctx, endpoint) + if err != nil { + return &applyErrBody{ + name: endpoint.Name(), + msg: err.Error(), + } + } + + mutex.Do(func() { + endpoints[i].id = influxEndpoint.GetID() + rollbackEndpoints = append(rollbackEndpoints, endpoints[i]) + }) + + return nil + } + + return applier{ + creater: creater{ + entries: len(endpoints), + fn: createFn, + }, + rollbacker: rollbacker{ + resource: resource, + fn: func() error { + return s.rollbackNotificationEndpoints(rollbackEndpoints) + }, + }, + } +} + +func (s *Service) applyNotificationEndpoint(ctx context.Context, e notificationEndpoint) (influxdb.NotificationEndpoint, error) { + if e.existing != nil { + // stub out userID since we're always using hte http client which will fill it in for us with the token + // feels a bit broken that is required. + // TODO: look into this userID requirement + updatedEndpoint, err := s.endpointSVC.UpdateNotificationEndpoint(ctx, e.ID(), e.existing, 0) + if err != nil { + return nil, err + } + return updatedEndpoint, nil + } + + actual := e.summarize().NotificationEndpoint + err := s.endpointSVC.CreateNotificationEndpoint(ctx, actual, 0) + if err != nil { + return nil, err + } + + return actual, nil +} + +func (s *Service) rollbackNotificationEndpoints(endpoints []*notificationEndpoint) error { + var errs []string + for _, e := range endpoints { + if e.existing == nil { + _, _, err := s.endpointSVC.DeleteNotificationEndpoint(context.Background(), e.ID()) + if err != nil { + errs = append(errs, e.ID().String()) + } + continue + } + + _, err := s.endpointSVC.UpdateNotificationEndpoint(context.Background(), e.ID(), e.existing, 0) + if err != nil { + errs = append(errs, e.ID().String()) + } + } + + if len(errs) > 0 { + return fmt.Errorf(`notication_endpoint_ids=[%s] err="unable to delete"`, strings.Join(errs, ", ")) + } + + return nil +} + func (s *Service) applyTelegrafs(teles []*telegraf) applier { const resource = "telegrafs" @@ -1275,7 +1387,7 @@ func (s *Service) applyLabelMappings(labelMappings []SummaryLabelMapping) applie err := s.labelSVC.CreateLabelMapping(ctx, &mapping.LabelMapping) if err != nil { return &applyErrBody{ - name: fmt.Sprintf("%s:%s", mapping.ResourceID, mapping.LabelID), + name: fmt.Sprintf("%s:%s:%s", mapping.ResourceType, mapping.ResourceID, mapping.LabelID), msg: err.Error(), } } diff --git a/pkger/service_test.go b/pkger/service_test.go index a47eb83bd5a..dd4d4c23026 100644 --- a/pkger/service_test.go +++ b/pkger/service_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/mock" + "github.com/influxdata/influxdb/notification/endpoint" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -18,11 +19,12 @@ import ( func TestService(t *testing.T) { newTestService := func(opts ...ServiceSetterFn) *Service { opt := serviceOpt{ - bucketSVC: mock.NewBucketService(), - dashSVC: mock.NewDashboardService(), - labelSVC: mock.NewLabelService(), - teleSVC: mock.NewTelegrafConfigStore(), - varSVC: mock.NewVariableService(), + bucketSVC: mock.NewBucketService(), + dashSVC: mock.NewDashboardService(), + labelSVC: mock.NewLabelService(), + endpointSVC: mock.NewNotificationEndpointService(), + teleSVC: mock.NewTelegrafConfigStore(), + varSVC: mock.NewVariableService(), } for _, o := range opts { o(&opt) @@ -32,6 +34,7 @@ func TestService(t *testing.T) { WithBucketSVC(opt.bucketSVC), WithDashboardSVC(opt.dashSVC), WithLabelSVC(opt.labelSVC), + WithNoticationEndpointSVC(opt.endpointSVC), WithTelegrafSVC(opt.teleSVC), WithVariableSVC(opt.varSVC), ) @@ -40,7 +43,7 @@ func TestService(t *testing.T) { t.Run("DryRun", func(t *testing.T) { t.Run("buckets", func(t *testing.T) { t.Run("single bucket updated", func(t *testing.T) { - testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/bucket.yml", func(t *testing.T, pkg *Pkg) { fakeBktSVC := mock.NewBucketService() fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) { return &influxdb.Bucket{ @@ -51,7 +54,7 @@ func TestService(t *testing.T) { RetentionPeriod: 30 * time.Hour, }, nil } - svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService())) + svc := newTestService(WithBucketSVC(fakeBktSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err) @@ -75,12 +78,12 @@ func TestService(t *testing.T) { }) t.Run("single bucket new", func(t *testing.T) { - testfileRunner(t, "testdata/bucket", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/bucket.json", func(t *testing.T, pkg *Pkg) { fakeBktSVC := mock.NewBucketService() fakeBktSVC.FindBucketByNameFn = func(_ context.Context, orgID influxdb.ID, name string) (*influxdb.Bucket, error) { return nil, errors.New("not found") } - svc := newTestService(WithBucketSVC(fakeBktSVC), WithLabelSVC(mock.NewLabelService())) + svc := newTestService(WithBucketSVC(fakeBktSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err) @@ -101,7 +104,7 @@ func TestService(t *testing.T) { t.Run("labels", func(t *testing.T) { t.Run("two labels updated", func(t *testing.T) { - testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/label.json", func(t *testing.T, pkg *Pkg) { fakeLabelSVC := mock.NewLabelService() fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) { return []*influxdb.Label{ @@ -144,7 +147,7 @@ func TestService(t *testing.T) { }) t.Run("two labels created", func(t *testing.T) { - testfileRunner(t, "testdata/label", func(t *testing.T, pkg *Pkg) { + testfileRunner(t, "testdata/label.yml", func(t *testing.T, pkg *Pkg) { fakeLabelSVC := mock.NewLabelService() fakeLabelSVC.FindLabelsFn = func(_ context.Context, filter influxdb.LabelFilter) ([]*influxdb.Label, error) { return nil, errors.New("no labels found") @@ -173,6 +176,69 @@ func TestService(t *testing.T) { }) }) + t.Run("notification endpoints", func(t *testing.T) { + testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) { + fakeEndpointSVC := mock.NewNotificationEndpointService() + existing := &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 1, + Name: "http_none_auth_notification_endpoint", + Description: "old desc", + Status: influxdb.TaskStatusInactive, + }, + Method: "POST", + AuthMethod: "none", + URL: "https://www.example.com/endpoint/old", + } + fakeEndpointSVC.FindNotificationEndpointsF = func(ctx context.Context, f influxdb.NotificationEndpointFilter, opt ...influxdb.FindOptions) ([]influxdb.NotificationEndpoint, int, error) { + return []influxdb.NotificationEndpoint{existing}, 1, nil + } + + svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC)) + + _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) + require.NoError(t, err) + + require.Len(t, diff.NotificationEndpoints, 5) + + var ( + newEndpoints []DiffNotificationEndpoint + existingEndpoints []DiffNotificationEndpoint + ) + for _, e := range diff.NotificationEndpoints { + if e.Old != nil { + existingEndpoints = append(existingEndpoints, e) + continue + } + newEndpoints = append(newEndpoints, e) + } + require.Len(t, newEndpoints, 4) + require.Len(t, existingEndpoints, 1) + + expected := DiffNotificationEndpoint{ + ID: SafeID(1), + Name: "http_none_auth_notification_endpoint", + Old: &DiffNotificationEndpointValues{ + NotificationEndpoint: existing, + }, + New: DiffNotificationEndpointValues{ + NotificationEndpoint: &endpoint.HTTP{ + Base: endpoint.Base{ + ID: 1, + Name: "http_none_auth_notification_endpoint", + Description: "http none auth desc", + Status: influxdb.TaskStatusActive, + }, + AuthMethod: "none", + Method: "GET", + URL: "https://www.example.com/endpoint/noneauth", + }, + }, + } + assert.Equal(t, expected, existingEndpoints[0]) + }) + }) + t.Run("variables", func(t *testing.T) { testfileRunner(t, "testdata/variables", func(t *testing.T, pkg *Pkg) { fakeVarSVC := mock.NewVariableService() @@ -185,8 +251,7 @@ func TestService(t *testing.T) { }, }, nil } - fakeLabelSVC := mock.NewLabelService() // ignore mappings for now - svc := newTestService(WithLabelSVC(fakeLabelSVC), WithVariableSVC(fakeVarSVC)) + svc := newTestService(WithVariableSVC(fakeVarSVC)) _, diff, err := svc.DryRun(context.TODO(), influxdb.ID(100), pkg) require.NoError(t, err) @@ -648,6 +713,77 @@ func TestService(t *testing.T) { }) + t.Run("notification endpoints", func(t *testing.T) { + t.Run("successfully creates pkg of endpoints", func(t *testing.T) { + testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) { + fakeEndpointSVC := mock.NewNotificationEndpointService() + fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { + nr.SetID(influxdb.ID(fakeEndpointSVC.CreateNotificationEndpointCalls.Count() + 1)) + return nil + } + + svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC)) + + orgID := influxdb.ID(9000) + + sum, err := svc.Apply(context.TODO(), orgID, pkg) + require.NoError(t, err) + + require.Len(t, sum.NotificationEndpoints, 5) + + containsWithID := func(t *testing.T, name string) { + for _, actual := range sum.NotificationEndpoints { + if actual.GetID() == 0 { + assert.NotZero(t, actual.GetID()) + } + if actual.GetName() == name { + return + } + } + assert.Fail(t, "did not find notification by name: "+name) + } + + expectedNames := []string{ + "http_basic_auth_notification_endpoint", + "http_bearer_auth_notification_endpoint", + "http_none_auth_notification_endpoint", + "pager_duty_notification_endpoint", + "slack_notification_endpoint", + } + for _, expectedName := range expectedNames { + containsWithID(t, expectedName) + } + }) + }) + + t.Run("rolls back all created notifications on an error", func(t *testing.T) { + testfileRunner(t, "testdata/notification_endpoint.yml", func(t *testing.T, pkg *Pkg) { + fakeEndpointSVC := mock.NewNotificationEndpointService() + fakeEndpointSVC.CreateNotificationEndpointF = func(ctx context.Context, nr influxdb.NotificationEndpoint, userID influxdb.ID) error { + nr.SetID(influxdb.ID(fakeEndpointSVC.CreateNotificationEndpointCalls.Count() + 1)) + if fakeEndpointSVC.CreateNotificationEndpointCalls.Count() == 5 { + return errors.New("hit that kill count") + } + return nil + } + + // create some dupes + for name, endpoint := range pkg.mNotificationEndpoints { + pkg.mNotificationEndpoints["copy"+name] = endpoint + } + + svc := newTestService(WithNoticationEndpointSVC(fakeEndpointSVC)) + + orgID := influxdb.ID(9000) + + _, err := svc.Apply(context.TODO(), orgID, pkg) + require.Error(t, err) + + assert.GreaterOrEqual(t, fakeEndpointSVC.DeleteNotificationEndpointCalls.Count(), 5) + }) + }) + }) + t.Run("telegrafs", func(t *testing.T) { t.Run("successfuly creates", func(t *testing.T) { testfileRunner(t, "testdata/telegraf.yml", func(t *testing.T, pkg *Pkg) { diff --git a/pkger/testdata/notification_endpoint.json b/pkger/testdata/notification_endpoint.json index 8f6f4cfa68d..e8ca2d342bf 100644 --- a/pkger/testdata/notification_endpoint.json +++ b/pkger/testdata/notification_endpoint.json @@ -30,6 +30,7 @@ "kind": "NotificationEndpointHTTP", "name": "http_none_auth_notification_endpoint", "description": "http none auth desc", + "method": "GET", "type": "none", "url": "https://www.example.com/endpoint/noneauth", "status": "active", @@ -44,6 +45,7 @@ "kind": "NotificationEndpointHTTP", "name": "http_basic_auth_notification_endpoint", "description": "http basic auth desc", + "method": "POST", "type": "basic", "url": "https://www.example.com/endpoint/basicauth", "username": "secret username", @@ -61,6 +63,7 @@ "name": "http_bearer_auth_notification_endpoint", "description": "http bearer auth desc", "type": "bearer", + "method": "PUT", "url": "https://www.example.com/endpoint/bearerauth", "token": "secret token", "associations": [ diff --git a/pkger/testdata/notification_endpoint.yml b/pkger/testdata/notification_endpoint.yml index d77521a7bc2..b9052935654 100644 --- a/pkger/testdata/notification_endpoint.yml +++ b/pkger/testdata/notification_endpoint.yml @@ -21,6 +21,7 @@ spec: name: http_none_auth_notification_endpoint type: none description: http none auth desc + method: get url: https://www.example.com/endpoint/noneauth status: active associations: @@ -30,6 +31,7 @@ spec: name: http_basic_auth_notification_endpoint description: http basic auth desc type: basic + method: pOsT url: https://www.example.com/endpoint/basicauth username: "secret username" password: "secret password" @@ -41,6 +43,7 @@ spec: name: http_bearer_auth_notification_endpoint description: http bearer auth desc type: bearer + method: puT url: https://www.example.com/endpoint/bearerauth token: "secret token" associations: diff --git a/task.go b/task.go index a6f3c4e27d6..7453446962f 100644 --- a/task.go +++ b/task.go @@ -84,7 +84,8 @@ type Run struct { ID ID `json:"id,omitempty"` TaskID ID `json:"taskID"` Status string `json:"status"` - ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the time the task is scheduled to run at + ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query + RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index e3501b4ae08..44f12050229 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -386,7 +386,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error { case scheduledForField: scheduled, err := time.Parse(time.RFC3339, cr.Strings(j).ValueString(i)) if err != nil { - re.log.Info("Failed to parse scheduledAt time", zap.Error(err)) + re.log.Info("Failed to parse scheduledFor time", zap.Error(err)) continue } r.ScheduledFor = scheduled.UTC() diff --git a/task/backend/coordinator/task_coordinator.go b/task/backend/coordinator/task_coordinator.go index dd0d3fdb177..00db6b0cf11 100644 --- a/task/backend/coordinator/task_coordinator.go +++ b/task/backend/coordinator/task_coordinator.go @@ -40,6 +40,7 @@ type CoordinatorOption func(*TaskCoordinator) type SchedulableTask struct { *influxdb.Task sch scheduler.Schedule + lsc time.Time } func (t SchedulableTask) ID() scheduler.ID { @@ -58,14 +59,7 @@ func (t SchedulableTask) Offset() time.Duration { // LastScheduled parses the task's LatestCompleted value as a Time object func (t SchedulableTask) LastScheduled() time.Time { - if !t.LatestScheduled.IsZero() { - return t.LatestScheduled - } - if !t.LatestCompleted.IsZero() { - return t.LatestCompleted - } - - return t.CreatedAt + return t.lsc } func WithLimitOpt(i int) CoordinatorOption { @@ -81,13 +75,20 @@ func NewSchedulableTask(task *influxdb.Task) (SchedulableTask, error) { return SchedulableTask{}, errors.New("invalid cron or every") } effCron := task.EffectiveCron() - sch, err := scheduler.NewSchedule(effCron) + ts := task.CreatedAt + if !task.LatestScheduled.IsZero() { + ts = task.LatestScheduled + } else if !task.LatestCompleted.IsZero() { + ts = task.LatestCompleted + } + var sch scheduler.Schedule + var err error + sch, ts, err = scheduler.NewSchedule(effCron, ts) if err != nil { return SchedulableTask{}, err } - t := SchedulableTask{Task: task, sch: sch} - return t, nil + return SchedulableTask{Task: task, sch: sch, lsc: ts}, nil } func NewCoordinator(log *zap.Logger, scheduler scheduler.Scheduler, executor Executor, opts ...CoordinatorOption) *TaskCoordinator { diff --git a/task/backend/coordinator/task_coordinator_test.go b/task/backend/coordinator/task_coordinator_test.go index c24c0e5ccb9..ebd1cf939ba 100644 --- a/task/backend/coordinator/task_coordinator_test.go +++ b/task/backend/coordinator/task_coordinator_test.go @@ -101,8 +101,8 @@ func Test_Coordinator_Scheduler_Methods(t *testing.T) { three = influxdb.ID(3) now = time.Now().UTC() - taskOne = &influxdb.Task{ID: one, CreatedAt: now, Cron: "* * * * *"} - taskTwo = &influxdb.Task{ID: two, Status: "active", CreatedAt: now, Cron: "* * * * *"} + taskOne = &influxdb.Task{ID: one, CreatedAt: now, Cron: "@every 1s"} + taskTwo = &influxdb.Task{ID: two, Status: "active", CreatedAt: now, Cron: "@every 1m"} taskTwoInactive = &influxdb.Task{ID: two, Status: "inactive", CreatedAt: now, Cron: "* * * * *"} taskThreeOriginal = &influxdb.Task{ ID: three, @@ -124,15 +124,33 @@ func Test_Coordinator_Scheduler_Methods(t *testing.T) { if err != nil { t.Fatal(err) } + if schedulableT.LastScheduled().IsZero() { + t.Fatal("expected task LatestScheduled to not be zero but it was") + } + if schedulableT.LastScheduled() != time.Now().UTC().Truncate(time.Second) { + t.Fatalf("expected task LatestScheduled to be properly truncated but it was %s instead\n", schedulableT.LastScheduled()) + } schedulableTaskTwo, err := NewSchedulableTask(taskTwo) if err != nil { t.Fatal(err) } + if schedulableTaskTwo.LastScheduled().IsZero() { + t.Fatal("expected task LatestScheduled to not be zero but it was") + } + if schedulableTaskTwo.LastScheduled() != time.Now().UTC().Truncate(time.Minute) { + t.Fatalf("expected task LatestScheduled to be properly truncated but it was %s instead\n", schedulableT.LastScheduled()) + } schedulableTaskThree, err := NewSchedulableTask(taskThreeNew) if err != nil { t.Fatal(err) } + if schedulableTaskThree.LastScheduled().IsZero() { + t.Fatal("expected task LatestScheduled to not be zero but it was") + } + if schedulableTaskThree.LastScheduled() != time.Now().UTC().Truncate(time.Second) { + t.Fatalf("expected task LatestScheduled to be properly truncated but it was %s instead\n", schedulableT.LastScheduled()) + } runOne = &influxdb.Run{ ID: one, diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 92ff7d82935..4e345c77cb2 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -17,6 +17,7 @@ type ExecutorMetrics struct { manualRunsCounter *prometheus.CounterVec resumeRunsCounter *prometheus.CounterVec unrecoverableCounter *prometheus.CounterVec + runLatency *prometheus.HistogramVec } type runCollector struct { @@ -83,6 +84,13 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics { Name: "resume_runs_counter", Help: "Total number of runs resumed by task ID", }, []string{"taskID"}), + + runLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "run_latency_seconds", + Help: "Records the latency between the time the run was due to run and the time the task started execution, by task type", + }, []string{"task_type"}), } } @@ -122,13 +130,17 @@ func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector { em.manualRunsCounter, em.resumeRunsCounter, em.unrecoverableCounter, + em.runLatency, } } // StartRun store the delta time between when a run is due to start and actually starting. -func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration) { +func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration, runLatency time.Duration) { em.queueDelta.WithLabelValues(task.Type, "all").Observe(queueDelta.Seconds()) em.queueDelta.WithLabelValues("", task.ID.String()).Observe(queueDelta.Seconds()) + + // schedule interval duration = (time task was scheduled to run) - (time it actually ran) + em.runLatency.WithLabelValues(task.Type).Observe(runLatency.Seconds()) } // FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID. diff --git a/task/backend/executor/limits_test.go b/task/backend/executor/limits_test.go index e9af710ea6d..dd8dd1228f1 100644 --- a/task/backend/executor/limits_test.go +++ b/task/backend/executor/limits_test.go @@ -16,15 +16,15 @@ var ( func TestTaskConcurrency(t *testing.T) { tes := taskExecutorSystem(t) te := tes.ex - r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second)) + r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second), time.Now()) if err != nil { t.Fatal(err) } - r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second)) + r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second), time.Now()) if err != nil { t.Fatal(err) } - r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second)) + r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second), time.Now()) if err != nil { t.Fatal(err) } diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index 725e0783170..b4232607230 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -97,20 +97,20 @@ func (e *TaskExecutor) SetLimitFunc(l LimitFunc) { } // Execute is a executor to satisfy the needs of tasks -func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error { - _, err := e.PromisedExecute(ctx, id, scheduledAt) +func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) error { + _, err := e.PromisedExecute(ctx, id, scheduledFor, runAt) return err } -// PromisedExecute begins execution for the tasks id with a specific scheduledAt time. -// When we execute we will first build a run for the scheduledAt time, +// PromisedExecute begins execution for the tasks id with a specific scheduledFor time. +// When we execute we will first build a run for the scheduledFor time, // We then want to add to the queue anything that was manually queued to run. // If the queue is full the call to execute should hang and apply back pressure to the caller // We then start a worker to work the newly queued jobs. -func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (Promise, error) { +func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) (Promise, error) { iid := influxdb.ID(id) // create a run - p, err := e.createRun(ctx, iid, scheduledAt) + p, err := e.createRun(ctx, iid, scheduledFor, runAt) if err != nil { return nil, err } @@ -154,8 +154,8 @@ func (e *TaskExecutor) ResumeCurrentRun(ctx context.Context, id influxdb.ID, run return nil, influxdb.ErrRunNotFound } -func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*promise, error) { - r, err := e.tcs.CreateRun(ctx, id, scheduledAt.UTC()) +func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledFor time.Time, runAt time.Time) (*promise, error) { + r, err := e.tcs.CreateRun(ctx, id, scheduledFor.UTC(), runAt.UTC()) if err != nil { return nil, err } @@ -310,7 +310,7 @@ func (w *worker) start(p *promise) { w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted) // add to metrics - w.te.metrics.StartRun(p.task, time.Since(p.createdAt)) + w.te.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt)) p.startedAt = time.Now() } diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index 7b797924f39..c275b23e8a0 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -71,7 +71,7 @@ func testQuerySuccess(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -86,6 +86,10 @@ func testQuerySuccess(t *testing.T) { t.Fatal("promise and run dont match") } + if run.RunAt != time.Unix(126, 0).UTC() { + t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) + } + tes.svc.WaitForQueryLive(t, script) tes.svc.SucceedQuery(script) @@ -113,7 +117,7 @@ func testQueryFailure(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -196,7 +200,7 @@ func testResumingRun(t *testing.T) { t.Fatal(err) } - stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0)) + stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -239,7 +243,7 @@ func testWorkerLimit(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -281,7 +285,7 @@ func testLimitFunc(t *testing.T) { return nil }) - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -317,7 +321,7 @@ func testMetrics(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -383,6 +387,15 @@ func testMetrics(t *testing.T) { t.Fatalf("expected 1 manual run, got %v", got) } + m = promtest.MustFindMetric(t, mg, "task_executor_run_latency_seconds", map[string]string{"task_type": ""}) + if got := *m.Histogram.SampleCount; got != 1 { + t.Fatalf("expected to count 1 run latency metric, got %v", got) + } + + if got := *m.Histogram.SampleSum; got <= 100 { + t.Fatalf("expected run latency metric to be very large, got %v", got) + } + } func testIteratorFailure(t *testing.T) { @@ -403,7 +416,7 @@ func testIteratorFailure(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -447,7 +460,7 @@ func testErrorHandling(t *testing.T) { forcedErr := errors.New("could not find bucket") tes.svc.FailNextQuery(forcedErr) - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } diff --git a/task/backend/scheduler/scheduler.go b/task/backend/scheduler/scheduler.go index 9fe6febf3f8..c0e7fb8485e 100644 --- a/task/backend/scheduler/scheduler.go +++ b/task/backend/scheduler/scheduler.go @@ -2,9 +2,11 @@ package scheduler import ( "context" + "strings" "time" "github.com/influxdata/cron" + "github.com/influxdata/influxdb/task/options" ) // ID duplicates the influxdb ID so users of the scheduler don't have to @@ -19,7 +21,7 @@ type Executor interface { // Errors returned from the execute request imply that this attempt has failed and // should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors // so the time can be configurable. - Execute(ctx context.Context, id ID, scheduledAt time.Time) error + Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error } // Schedulable is the interface that encapsulates work that @@ -50,10 +52,37 @@ type SchedulableService interface { UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error } -// NewSchedule takes a cron string -func NewSchedule(c string) (Schedule, error) { - sch, err := cron.ParseUTC(c) - return Schedule{cron: sch}, err +func NewSchedule(unparsed string, lastScheduledAt time.Time) (Schedule, time.Time, error) { + lastScheduledAt = lastScheduledAt.UTC().Truncate(time.Second) + c, err := cron.ParseUTC(unparsed) + if err != nil { + return Schedule{}, lastScheduledAt, err + } + + unparsed = strings.TrimSpace(unparsed) + + // Align create to the hour/minute + if strings.HasPrefix(unparsed, "@every ") { + everyString := strings.TrimSpace(strings.TrimPrefix(unparsed, "@every ")) + every := options.Duration{} + err := every.Parse(everyString) + if err != nil { + // We cannot align a invalid time + return Schedule{c}, lastScheduledAt, nil + } + + // drop nanoseconds + lastScheduledAt = time.Unix(lastScheduledAt.UTC().Unix(), 0).UTC() + everyDur, err := every.DurationFrom(lastScheduledAt) + if err != nil { + return Schedule{c}, lastScheduledAt, nil + } + + // and align + lastScheduledAt = lastScheduledAt.Truncate(everyDur).Truncate(time.Second) + } + + return Schedule{c}, lastScheduledAt, err } // Schedule is an object a valid schedule of runs diff --git a/task/backend/scheduler/scheduler_test.go b/task/backend/scheduler/scheduler_test.go index fa73a9b922d..f8af52c829a 100644 --- a/task/backend/scheduler/scheduler_test.go +++ b/task/backend/scheduler/scheduler_test.go @@ -1,17 +1,20 @@ -package scheduler // can +package scheduler import ( "context" + "reflect" "sync" "testing" "time" + "github.com/influxdata/cron" + "github.com/benbjohnson/clock" ) type mockExecutor struct { sync.Mutex - fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) + fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) Err error } @@ -36,12 +39,12 @@ func (s mockSchedulable) LastScheduled() time.Time { return s.lastScheduled } -func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledAt time.Time) error { +func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error { done := make(chan struct{}, 1) select { case <-ctx.Done(): default: - e.fn(&sync.Mutex{}, ctx, id, scheduledAt) + e.fn(&sync.Mutex{}, ctx, id, scheduledFor) done <- struct{}{} } return nil @@ -57,14 +60,124 @@ func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id ID, } func TestSchedule_Next(t *testing.T) { + t.Run("@every fires on appropriate boundaries", func(t *testing.T) { + t.Run("@every 1m", func(t *testing.T) { + mockTime := clock.NewMock() + mockTime.Set(time.Now()) + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + } + }} + sch, _, err := NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { + return nil + }}, + WithTime(mockTime), + WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, ts, err := NewSchedule("@every 1m", mockTime.Now().UTC()) + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) + if err != nil { + t.Fatal(err) + } + go func() { + sch.mu.Lock() + mockTime.Set(mockTime.Now().UTC().Add(17 * time.Minute)) + sch.mu.Unlock() + }() + + after := time.After(6 * time.Second) + oldCheckC := ts + for i := 0; i < 16; i++ { + select { + case checkC := <-c: + if checkC.Sub(oldCheckC) != time.Minute { + t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) + } + if !checkC.Truncate(time.Minute).Equal(checkC) { + t.Fatalf("task didn't fire at the correct time boundary") + } + oldCheckC = checkC + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) + } + } + }) + t.Run("@every 1h", func(t *testing.T) { + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + } + }} + mockTime := clock.NewMock() + mockTime.Set(time.Now()) + sch, _, err := NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { + return nil + }}, + WithTime(mockTime), + WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, ts, err := NewSchedule("@every 1h", mockTime.Now().UTC()) + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts}) + if err != nil { + t.Fatal(err) + } + go func() { + sch.mu.Lock() + mockTime.Set(mockTime.Now().UTC().Add(17 * time.Hour)) + sch.mu.Unlock() + }() + + after := time.After(6 * time.Second) + oldCheckC := ts + for i := 0; i < 16; i++ { + select { + case checkC := <-c: + if checkC.Sub(oldCheckC) != time.Hour { + t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC)) + } + if !checkC.Truncate(time.Hour).Equal(checkC) { + t.Fatalf("task didn't fire at the correct time boundary") + } + oldCheckC = checkC + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) + } + } + }) + }) t.Run("fires properly with non-mocked time", func(t *testing.T) { now := time.Now() c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: default: t.Errorf("called the executor too many times") } @@ -79,7 +192,7 @@ func TestSchedule_Next(t *testing.T) { t.Fatal(err) } defer sch.Stop() - schedule, err := NewSchedule("* * * * * * *") + schedule, _, err := NewSchedule("* * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -99,11 +212,11 @@ func TestSchedule_Next(t *testing.T) { mockTime := clock.NewMock() mockTime.Set(time.Now()) c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: default: t.Errorf("called the executor too many times") } @@ -119,7 +232,7 @@ func TestSchedule_Next(t *testing.T) { t.Fatal(err) } defer sch.Stop() - schedule, err := NewSchedule("* * * * * * *") + schedule, _, err := NewSchedule("* * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -144,11 +257,11 @@ func TestSchedule_Next(t *testing.T) { t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) { c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: } }} mockTime := clock.NewMock() @@ -164,7 +277,7 @@ func TestSchedule_Next(t *testing.T) { t.Fatal(err) } defer sch.Stop() - schedule, err := NewSchedule("* * * * * * *") + schedule, _, err := NewSchedule("* * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -216,7 +329,7 @@ func TestSchedule_Next(t *testing.T) { ts time.Time id ID }, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") @@ -224,7 +337,7 @@ func TestSchedule_Next(t *testing.T) { ts time.Time id ID }{ - ts: scheduledAt, + ts: scheduledFor, id: id, }: } @@ -242,7 +355,7 @@ func TestSchedule_Next(t *testing.T) { t.Fatal(err) } defer sch.Stop() - schedule, err := NewSchedule("* * * * * * *") + schedule, _, err := NewSchedule("* * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -251,7 +364,8 @@ func TestSchedule_Next(t *testing.T) { if err != nil { t.Fatal(err) } - schedule2, err := NewSchedule("*/2 * * * * * *") + + schedule2, _, err := NewSchedule("*/2 * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -303,7 +417,7 @@ func TestTreeScheduler_Stop(t *testing.T) { now := time.Now().Add(-20 * time.Second) mockTime := clock.NewMock() mockTime.Set(now) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {}} + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {}} sch, _, err := NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { return nil }}, @@ -322,7 +436,7 @@ func TestSchedule_panic(t *testing.T) { err error }, 1) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { panic("yikes oh no!") }} @@ -345,7 +459,7 @@ func TestSchedule_panic(t *testing.T) { t.Fatal(err) } - schedule, err := NewSchedule("* * * * * * *") + schedule, _, err := NewSchedule("* * * * * * *", time.Time{}) if err != nil { t.Fatal(err) } @@ -370,7 +484,7 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) { mockTime := clock.NewMock() mockTime.Set(now) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") @@ -391,23 +505,23 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) { defer sch.Stop() // this tests for a race condition in the btree that isn't normally caught by the race detector - schedule, err := NewSchedule("* * * * * * *") + schedule, ts, err := NewSchedule("* * * * * * *", now.Add(-1*time.Second)) if err != nil { t.Fatal(err) } - badSchedule, err := NewSchedule("0 0 1 12 *") + badSchedule, ts, err := NewSchedule("0 0 1 12 *", now.Add(-1*time.Second)) if err != nil { t.Fatal(err) } for i := ID(1); i <= 2000; i++ { // since a valid ID probably shouldn't be zero if i%100 == 0 { - err = sch.Schedule(mockSchedulable{id: i, schedule: badSchedule, offset: 0, lastScheduled: now.Add(-1 * time.Second)}) + err = sch.Schedule(mockSchedulable{id: i, schedule: badSchedule, offset: 0, lastScheduled: ts}) if err != nil { t.Fatal(err) } } else { - err = sch.Schedule(mockSchedulable{id: i, schedule: schedule, offset: 0, lastScheduled: now.Add(-1 * time.Second)}) + err = sch.Schedule(mockSchedulable{id: i, schedule: schedule, offset: 0, lastScheduled: ts}) if err != nil { t.Fatal(err) } @@ -423,11 +537,11 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) { func TestTreeScheduler_Release(t *testing.T) { c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: } }} mockTime := clock.NewMock() @@ -443,12 +557,12 @@ func TestTreeScheduler_Release(t *testing.T) { t.Fatal(err) } defer sch.Stop() - schedule, err := NewSchedule("* * * * * * *") + schedule, ts, err := NewSchedule("* * * * * * *", mockTime.Now().UTC()) if err != nil { t.Fatal(err) } - err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: mockTime.Now().UTC()}) + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts.UTC()}) if err != nil { t.Fatal(err) } @@ -463,7 +577,9 @@ func TestTreeScheduler_Release(t *testing.T) { case <-time.After(6 * time.Second): t.Fatalf("test timed out, it should have fired but didn't") } - sch.Release(1) + if err := sch.Release(1); err != nil { + t.Error(err) + } go func() { sch.mu.Lock() @@ -477,3 +593,76 @@ func TestTreeScheduler_Release(t *testing.T) { case <-time.After(2 * time.Second): } } + +func mustCron(s string) Schedule { + cr, err := cron.ParseUTC(s) + if err != nil { + panic(err) + } + return Schedule{cron: cr} +} + +func TestNewSchedule(t *testing.T) { + tests := []struct { + name string + unparsed string + lastScheduledAt time.Time + want Schedule + want1 time.Time + wantErr bool + }{ + { + name: "bad cron", + unparsed: "this is not a cron string", + lastScheduledAt: time.Now(), + wantErr: true, + }, + { + name: "align to minute", + unparsed: "@every 1m", + lastScheduledAt: time.Date(2016, 01, 01, 01, 10, 23, 1234567, time.UTC), + want: mustCron("@every 1m"), + want1: time.Date(2016, 01, 01, 01, 10, 0, 0, time.UTC), + }, + { + name: "align to minute with @every 7m", + unparsed: "@every 7m", + lastScheduledAt: time.Date(2016, 01, 01, 01, 10, 23, 1234567, time.UTC), + want: mustCron("@every 7m"), + want1: time.Date(2016, 01, 01, 01, 4, 0, 0, time.UTC), + }, + + { + name: "align to hour", + unparsed: "@every 1h", + lastScheduledAt: time.Date(2016, 01, 01, 01, 10, 23, 1234567, time.UTC), + want: mustCron("@every 1h"), + want1: time.Date(2016, 01, 01, 01, 0, 0, 0, time.UTC), + }, + { + name: "align to hour @every 3h", + unparsed: "@every 3h", + lastScheduledAt: time.Date(2016, 01, 01, 01, 10, 23, 1234567, time.UTC), + want: mustCron("@every 3h"), + want1: time.Date(2016, 01, 01, 00, 0, 0, 0, time.UTC), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1, err := NewSchedule(tt.unparsed, tt.lastScheduledAt) + if (err != nil) != tt.wantErr { + t.Errorf("NewSchedule() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewSchedule() got = %v, want %v", got, tt.want) + } + if !reflect.DeepEqual(got1, tt.want1) { + t.Errorf("NewSchedule() got1 = %v, want %v", got1, tt.want1) + } + }) + } +} diff --git a/task/backend/scheduler/treescheduler.go b/task/backend/scheduler/treescheduler.go index e2e4fb1ed34..e816ab97186 100644 --- a/task/backend/scheduler/treescheduler.go +++ b/task/backend/scheduler/treescheduler.go @@ -78,7 +78,7 @@ type TreeScheduler struct { } // ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler. -type ErrorFunc func(ctx context.Context, taskID ID, scheduledAt time.Time, err error) +type ErrorFunc func(ctx context.Context, taskID ID, scheduledFor time.Time, err error) type treeSchedulerOptFunc func(t *TreeScheduler) error @@ -318,7 +318,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) { s.sm.reportScheduleDelay(time.Since(it.Next())) preExec := time.Now() // execute - err = s.executor.Execute(ctx, it.id, t) + err = s.executor.Execute(ctx, it.id, t, it.when()) // report how long execution took s.sm.reportExecution(err, time.Since(preExec)) return err diff --git a/task/backend/task.go b/task/backend/task.go index 326e52b07c4..b497746497f 100644 --- a/task/backend/task.go +++ b/task/backend/task.go @@ -22,10 +22,10 @@ type TaskControlService interface { NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) // CreateRun creates a run with a schedule for time. - // This differes from CreateNextRun in that it should not to use some scheduling system to determin when the run + // This differs from CreateNextRun in that it should not to use some scheduling system to determine when the run // should happen. // TODO(lh): remove comment once we no longer need create next run. - CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) + CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) diff --git a/task/mock/task_control_service.go b/task/mock/task_control_service.go index e7093f8d72e..3cec8d79122 100644 --- a/task/mock/task_control_service.go +++ b/task/mock/task_control_service.go @@ -155,7 +155,7 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back }, nil } -func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { t.mu.Lock() defer t.mu.Unlock() diff --git a/ui/cypress/e2e/orgs.test.ts b/ui/cypress/e2e/orgs.test.ts new file mode 100644 index 00000000000..99d8baeb42f --- /dev/null +++ b/ui/cypress/e2e/orgs.test.ts @@ -0,0 +1,21 @@ +describe('Orgs', () => { + beforeEach(() => { + cy.flush() + }) + + describe('when there is a user with no orgs', () => { + beforeEach(() => { + cy.signin().then(({body}) => { + cy.deleteOrg(body.org.id) + }) + + cy.visit('/') + }) + + it('forwards the user to the No Orgs Page', () => { + cy.url().should('contain', 'no-org') + cy.contains('Sign In').click() + cy.url().should('contain', 'signin') + }) + }) +}) diff --git a/ui/cypress/e2e/queryBuilder.test.ts b/ui/cypress/e2e/queryBuilder.test.ts index 3f6d846470a..05cbcc959b9 100644 --- a/ui/cypress/e2e/queryBuilder.test.ts +++ b/ui/cypress/e2e/queryBuilder.test.ts @@ -88,7 +88,9 @@ describe('The Query Builder', () => { }) }) - describe('from the Dashboard view', () => { + // This is flaky in prod + // https://circleci.com/gh/influxdata/influxdb/74628#artifacts/containers/0 + describe.skip('from the Dashboard view', () => { beforeEach(() => { cy.get('@org').then((org: Organization) => { cy.createDashboard(org.id).then(({body}) => { diff --git a/ui/cypress/index.d.ts b/ui/cypress/index.d.ts index c255b24c4f9..05e739055ad 100644 --- a/ui/cypress/index.d.ts +++ b/ui/cypress/index.d.ts @@ -8,6 +8,7 @@ import { createCell, createOrg, createSource, + deleteOrg, flush, getByTestID, getByInputName, @@ -48,6 +49,7 @@ declare global { createDashWithViewAndVar: typeof createDashWithViewAndVar createView: typeof createView createOrg: typeof createOrg + deleteOrg: typeof deleteOrg flush: typeof flush getByTestID: typeof getByTestID getByInputName: typeof getByInputName diff --git a/ui/cypress/support/commands.ts b/ui/cypress/support/commands.ts index 49e69a56bc7..8fbdc5cd5a3 100644 --- a/ui/cypress/support/commands.ts +++ b/ui/cypress/support/commands.ts @@ -123,6 +123,13 @@ export const createOrg = (): Cypress.Chainable => { }) } +export const deleteOrg = (id: string): Cypress.Chainable => { + return cy.request({ + method: 'DELETE', + url: `/api/v2/orgs/${id}`, + }) +} + export const createBucket = ( orgID?: string, organization?: string, @@ -452,6 +459,7 @@ Cypress.Commands.add('createView', createView) // orgs Cypress.Commands.add('createOrg', createOrg) +Cypress.Commands.add('deleteOrg', deleteOrg) // buckets Cypress.Commands.add('createBucket', createBucket) diff --git a/ui/package.json b/ui/package.json index 36b69015031..78c26444c83 100644 --- a/ui/package.json +++ b/ui/package.json @@ -129,7 +129,7 @@ "dependencies": { "@influxdata/clockface": "1.1.0", "@influxdata/flux-parser": "^0.3.0", - "@influxdata/giraffe": "0.16.10", + "@influxdata/giraffe": "0.16.11", "@influxdata/influx": "0.5.5", "@influxdata/influxdb-templates": "0.9.0", "@influxdata/react-custom-scrollbars": "4.3.8", diff --git a/ui/src/dashboards/actions/index.ts b/ui/src/dashboards/actions/index.ts index b519b07813d..38ae1335673 100644 --- a/ui/src/dashboards/actions/index.ts +++ b/ui/src/dashboards/actions/index.ts @@ -163,13 +163,30 @@ export const editDashboard = (dashboard: Dashboard): EditDashboardAction => ({ export const setDashboards = ( status: RemoteDataState, list?: Dashboard[] -): SetDashboardsAction => ({ - type: ActionTypes.SetDashboards, - payload: { - status, - list, - }, -}) +): SetDashboardsAction => { + if (list) { + list = list.map(obj => { + if (obj.name === undefined) { + obj.name = '' + } + if (obj.meta === undefined) { + obj.meta = {} + } + if (obj.meta.updatedAt === undefined) { + obj.meta.updatedAt = new Date().toDateString() + } + return obj + }) + } + + return { + type: ActionTypes.SetDashboards, + payload: { + status, + list, + }, + } +} export const setDashboard = (dashboard: Dashboard): SetDashboardAction => ({ type: ActionTypes.SetDashboard, diff --git a/ui/src/dashboards/components/dashboard_index/Table.tsx b/ui/src/dashboards/components/dashboard_index/Table.tsx index 723e0dab85a..caf77b3e516 100644 --- a/ui/src/dashboards/components/dashboard_index/Table.tsx +++ b/ui/src/dashboards/components/dashboard_index/Table.tsx @@ -31,7 +31,7 @@ interface State { sortType: SortTypes } -type SortKey = keyof Dashboard | 'modified' +type SortKey = keyof Dashboard | 'meta.updatedAt' type Props = OwnProps & WithRouterProps @@ -58,15 +58,15 @@ class DashboardsTable extends PureComponent { @@ -91,14 +91,9 @@ class DashboardsTable extends PureComponent { ) } - private get headerKeys(): SortKey[] { - return ['name', 'modified'] - } - private handleClickColumn = (nextSort: Sort, sortKey: SortKey) => { let sortType = SortTypes.String - - if (sortKey === 'modified') { + if (sortKey === 'meta.updatedAt') { sortType = SortTypes.Date } diff --git a/ui/src/dashboards/constants/index.ts b/ui/src/dashboards/constants/index.ts index bc1cd40e6b2..4ceacc01f68 100644 --- a/ui/src/dashboards/constants/index.ts +++ b/ui/src/dashboards/constants/index.ts @@ -36,7 +36,7 @@ export const FORMAT_OPTIONS: Array<{text: string}> = [ {text: 'DD/MM/YYYY HH:mm:ss.sss'}, {text: 'MM/DD/YYYY HH:mm:ss.sss'}, {text: 'YYYY/MM/DD HH:mm:ss'}, - {text: 'HH:mm a'}, + {text: 'hh:mm a'}, {text: 'HH:mm'}, {text: 'HH:mm:ss'}, {text: 'HH:mm:ss ZZ'}, diff --git a/ui/src/index.tsx b/ui/src/index.tsx index ac764439da6..7e3bf1bd2e4 100644 --- a/ui/src/index.tsx +++ b/ui/src/index.tsx @@ -88,6 +88,7 @@ import NewRuleOverlay from 'src/alerting/components/notifications/NewRuleOverlay import EditRuleOverlay from 'src/alerting/components/notifications/EditRuleOverlay' import NewEndpointOverlay from 'src/alerting/components/endpoints/NewEndpointOverlay' import EditEndpointOverlay from 'src/alerting/components/endpoints/EditEndpointOverlay' +import NoOrgsPage from 'src/organizations/containers/NoOrgsPage' // Overlays import OverlayHandler, { @@ -196,6 +197,7 @@ class Root extends PureComponent { + diff --git a/ui/src/organizations/containers/NoOrgsPage.tsx b/ui/src/organizations/containers/NoOrgsPage.tsx new file mode 100644 index 00000000000..31ccd12ed94 --- /dev/null +++ b/ui/src/organizations/containers/NoOrgsPage.tsx @@ -0,0 +1,49 @@ +// Libraries +import React, {FC} from 'react' +import {FunnelPage, Button, ComponentColor} from '@influxdata/clockface' +import {withRouter, WithRouterProps} from 'react-router' +import {connect} from 'react-redux' + +// Types +import {AppState} from 'src/types' + +interface DispatchProps { + me: AppState['me'] +} + +type Props = DispatchProps & WithRouterProps + +const NoOrgsPage: FC = ({router, me}) => { + const handleClick = () => { + router.push('/signin') + } + + return ( + +
+ +
+

Whoops!

+

+ You don't belong to an organization. +
+ Add user {`"${me.name}"`} to an organization to + continue +

+