Skip to content

Commit

Permalink
feat(pkger): add the ability to remove a stack and all its associated…
Browse files Browse the repository at this point in the history
… resources

closes: #17554
  • Loading branch information
jsteenb2 committed May 1, 2020
1 parent 369186d commit 7400b5c
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Features

1. [17934](https://github.com/influxdata/influxdb/pull/17934): Add ability to delete a stack and all the resources associated with it

### Bug Fixes

1. [17906](https://github.com/influxdata/influxdb/pull/17906): Ensure UpdateUser cleans up the index when updating names
Expand Down
6 changes: 6 additions & 0 deletions cmd/influx/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,8 @@ type fakePkgSVC struct {
applyFn func(ctx context.Context, orgID, userID influxdb.ID, pkg *pkger.Pkg, opts ...pkger.ApplyOptFn) (pkger.Summary, pkger.Diff, error)
}

var _ pkger.SVC = (*fakePkgSVC)(nil)

func (f *fakePkgSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger.Stack) (pkger.Stack, error) {
if f.initStackFn != nil {
return f.initStackFn(ctx, userID, stack)
Expand All @@ -719,6 +721,10 @@ func (f *fakePkgSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter p
panic("not implemented")
}

func (f *fakePkgSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
panic("not implemented")
}

func (f *fakePkgSVC) CreatePkg(ctx context.Context, setters ...pkger.CreatePkgSetFn) (*pkger.Pkg, error) {
if f.createFn != nil {
return f.createFn(ctx, setters...)
Expand Down
99 changes: 96 additions & 3 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,88 @@ func TestLauncher_Pkger(t *testing.T) {
})
})

t.Run("delete stack will delete the stack and all resources associated with it", func(t *testing.T) {
newStack, err := svc.InitStack(ctx, l.User.ID, pkger.Stack{
OrgID: l.Org.ID,
})
require.NoError(t, err)

newEndpointPkgName := "non_existent_endpoint"
allResourcesPkg := newPkg(
newBucketObject("non_existent_bucket", "", ""),
newCheckDeadmanObject(t, "non_existent_check", "", time.Minute),
newDashObject("non_existent_dash", "", ""),
newEndpointHTTP(newEndpointPkgName, "", ""),
newLabelObject("non_existent_label", "", "", ""),
newRuleObject(t, "non_existent_rule", "", newEndpointPkgName, ""),
newTaskObject("non_existent_task", "", ""),
newTelegrafObject("non_existent_tele", "", ""),
newVariableObject("non_existent_var", "", ""),
)

sum, _, err := svc.Apply(ctx, l.Org.ID, l.User.ID, allResourcesPkg, pkger.ApplyWithStackID(newStack.ID))
require.NoError(t, err)

require.Len(t, sum.Buckets, 1)
assert.NotZero(t, sum.Buckets[0].ID)
require.Len(t, sum.Checks, 1)
assert.NotZero(t, sum.Checks[0].Check.GetID())
require.Len(t, sum.Dashboards, 1)
assert.NotZero(t, sum.Dashboards[0].ID)
require.Len(t, sum.Labels, 1)
assert.NotZero(t, sum.Labels[0].ID)
require.Len(t, sum.NotificationEndpoints, 1)
assert.NotZero(t, sum.NotificationEndpoints[0].NotificationEndpoint.GetID())
require.Len(t, sum.NotificationRules, 1)
assert.NotZero(t, sum.NotificationRules[0].ID)
require.Len(t, sum.Tasks, 1)
assert.NotZero(t, sum.Tasks[0].ID)
require.Len(t, sum.TelegrafConfigs, 1)
assert.NotZero(t, sum.TelegrafConfigs[0].TelegrafConfig.ID)
require.Len(t, sum.Variables, 1)
assert.NotZero(t, sum.Variables[0].ID)

err = svc.DeleteStack(ctx, struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: l.Org.ID,
UserID: l.User.ID,
StackID: newStack.ID,
})
require.NoError(t, err)

matchingStacks, err := svc.ListStacks(ctx, l.Org.ID, pkger.ListFilter{
StackIDs: []influxdb.ID{newStack.ID},
})
require.NoError(t, err)
require.Empty(t, matchingStacks)

_, err = resourceCheck.getBucket(t, byID(influxdb.ID(sum.Buckets[0].ID)))
assert.Error(t, err)

_, err = resourceCheck.getCheck(t, byID(sum.Checks[0].Check.GetID()))
assert.Error(t, err)

_, err = resourceCheck.getDashboard(t, byID(influxdb.ID(sum.Dashboards[0].ID)))
assert.Error(t, err)

_, err = resourceCheck.getLabel(t, byID(influxdb.ID(sum.Labels[0].ID)))
assert.Error(t, err)

_, err = resourceCheck.getEndpoint(t, byID(sum.NotificationEndpoints[0].NotificationEndpoint.GetID()))
assert.Error(t, err)

_, err = resourceCheck.getRule(t, byID(influxdb.ID(sum.NotificationRules[0].ID)))
assert.Error(t, err)

_, err = resourceCheck.getTask(t, byID(influxdb.ID(sum.Tasks[0].ID)))
assert.Error(t, err)

_, err = resourceCheck.getTelegrafConfig(t, byID(sum.TelegrafConfigs[0].TelegrafConfig.ID))
assert.Error(t, err)

_, err = resourceCheck.getVariable(t, byID(influxdb.ID(sum.Variables[0].ID)))
assert.Error(t, err)
})

t.Run("apply with only a stackID succeeds when stack has URLs", func(t *testing.T) {
svr := httptest.NewServer(nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
pkg := newPkg(newBucketObject("bucket_0", "", ""))
Expand Down Expand Up @@ -2296,6 +2378,12 @@ type (
getResourceOptFn func() getResourceOpt
)

func byID(id influxdb.ID) getResourceOptFn {
return func() getResourceOpt {
return getResourceOpt{id: id}
}
}

func byName(name string) getResourceOptFn {
return func() getResourceOpt {
return getResourceOpt{name: name}
Expand Down Expand Up @@ -2507,8 +2595,11 @@ func (r resourceChecker) getLabel(t *testing.T, getOpt getResourceOptFn) (influx
default:
require.Fail(t, "did not provide any get option")
}
if err != nil {
return influxdb.Label{}, err
}

return *label, err
return *label, nil
}

func (r resourceChecker) mustGetLabel(t *testing.T, getOpt getResourceOptFn) influxdb.Label {
Expand Down Expand Up @@ -2700,8 +2791,10 @@ func (r resourceChecker) getVariable(t *testing.T, getOpt getResourceOptFn) (inf
default:
require.Fail(t, "did not provide any get option")
}

return *variable, err
if err != nil {
return influxdb.Variable{}, err
}
return *variable, nil
}

func (r resourceChecker) mustGetVariable(t *testing.T, getOpt getResourceOptFn) influxdb.Variable {
Expand Down
28 changes: 28 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4510,6 +4510,34 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/packages/stacks/{stack_id}:
delete:
operationId: DeleteStack
tags:
- InfluxPackages
summary: Delete a stack and remove all its associated resources
parameters:
- in: path
name: stack_id
required: true
schema:
type: string
description: The stack id to be removed
- in: query
name: orgID
required: true
schema:
type: string
description: The organization id of the user
responses:
'204':
description: Stack and all its associated resources are deleted
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/tasks:
get:
operationId: GetTasks
Expand Down
7 changes: 7 additions & 0 deletions pkger/http_remote_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (s *HTTPRemoteService) InitStack(ctx context.Context, userID influxdb.ID, s
return newStack, nil
}

func (s *HTTPRemoteService) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
return s.Client.
Delete(RoutePrefix, "stacks", identifiers.StackID.String()).
QueryParams([2]string{"orgID", identifiers.OrgID.String()}).
Do(ctx)
}

func (s *HTTPRemoteService) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
queryParams := [][2]string{{"orgID", orgID.String()}}
for _, name := range f.Names {
Expand Down
60 changes: 60 additions & 0 deletions pkger/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ func NewHTTPServer(log *zap.Logger, svc SVC) *HTTPServer {
{
r.With(middleware.AllowContentType("text/yml", "application/x-yaml", "application/json")).
Post("/", svr.createPkg)

r.With(middleware.SetHeader("Content-Type", "application/json; charset=utf-8")).
Post("/apply", svr.applyPkg)

r.Route("/stacks", func(r chi.Router) {
r.Post("/", svr.createStack)
r.Get("/", svr.listStacks)
r.Delete("/{stack_id}", svr.deleteStack)
})
}

Expand Down Expand Up @@ -203,6 +206,63 @@ func (s *HTTPServer) createStack(w http.ResponseWriter, r *http.Request) {
})
}

func (s *HTTPServer) deleteStack(w http.ResponseWriter, r *http.Request) {
orgID, err := getRequiredOrgIDFromQuery(r.URL.Query())
if err != nil {
s.api.Err(w, err)
return
}

stackID, err := influxdb.IDFromString(chi.URLParam(r, "stack_id"))
if err != nil {
s.api.Err(w, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "the stack id provided in the path was invalid",
Err: err,
})
return
}

auth, err := pctx.GetAuthorizer(r.Context())
if err != nil {
s.api.Err(w, err)
return
}
userID := auth.GetUserID()

err = s.svc.DeleteStack(r.Context(), struct{ OrgID, UserID, StackID influxdb.ID }{
OrgID: orgID,
UserID: userID,
StackID: *stackID,
})
if err != nil {
s.api.Err(w, err)
return
}

s.api.Respond(w, http.StatusNoContent, nil)
}

func getRequiredOrgIDFromQuery(q url.Values) (influxdb.ID, error) {
orgIDRaw := q.Get("orgID")
if orgIDRaw == "" {
return 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "the orgID query param is required",
}
}

orgID, err := influxdb.IDFromString(orgIDRaw)
if err != nil {
return 0, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "the orgID query param was invalid",
Err: err,
}
}
return *orgID, nil
}

// ReqCreateOrgIDOpt provides options to export resources by organization id.
type ReqCreateOrgIDOpt struct {
OrgID string `json:"orgID"`
Expand Down
4 changes: 4 additions & 0 deletions pkger/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,10 @@ func (f *fakeSVC) InitStack(ctx context.Context, userID influxdb.ID, stack pkger
return f.initStack(ctx, userID, stack)
}

func (f *fakeSVC) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
panic("not implemented yet")
}

func (f *fakeSVC) ListStacks(ctx context.Context, orgID influxdb.ID, filter pkger.ListFilter) ([]pkger.Stack, error) {
if f.listStacksFn == nil {
panic("not implemented")
Expand Down
22 changes: 21 additions & 1 deletion pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const ResourceTypeStack influxdb.ResourceType = "stack"
// SVC is the packages service interface.
type SVC interface {
InitStack(ctx context.Context, userID influxdb.ID, stack Stack) (Stack, error)
DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error
ListStacks(ctx context.Context, orgID influxdb.ID, filter ListFilter) ([]Stack, error)

CreatePkg(ctx context.Context, setters ...CreatePkgSetFn) (*Pkg, error)
Expand Down Expand Up @@ -298,6 +299,25 @@ func (s *Service) InitStack(ctx context.Context, userID influxdb.ID, stack Stack
return stack, nil
}

// DeleteStack removes a stack and all the resources that have are associated with the stack.
func (s *Service) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (e error) {
// providing empty Pkg will remove all applied resources
state, err := s.dryRun(ctx, identifiers.OrgID, new(Pkg), applyOptFromOptFns(ApplyWithStackID(identifiers.StackID)))
if err != nil {
return err
}

coordinator := &rollbackCoordinator{sem: make(chan struct{}, s.applyReqLimit)}
defer coordinator.rollback(s.log, &e, identifiers.OrgID)

err = s.applyState(ctx, coordinator, identifiers.OrgID, identifiers.UserID, state, nil)
if err != nil {
return err
}

return s.store.DeleteStack(ctx, identifiers.StackID)
}

// ListFilter are filter options for filtering stacks from being returned.
type ListFilter struct {
StackIDs []influxdb.ID
Expand Down Expand Up @@ -685,7 +705,7 @@ func (s *Service) dryRun(ctx context.Context, orgID influxdb.ID, pkg *Pkg, opt A
// will be skipped, and won't bleed into the dry run here. We can now return
// a error (parseErr) and valid diff/summary.
var parseErr error
err := pkg.Validate()
err := pkg.Validate(ValidWithoutResources())
if err != nil && !IsParseErr(err) {
return nil, internalErr(err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkger/service_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func (s *authMW) InitStack(ctx context.Context, userID influxdb.ID, newStack Sta
return s.next.InitStack(ctx, userID, newStack)
}

func (s *authMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
err := s.authAgent.IsWritable(ctx, identifiers.OrgID, ResourceTypeStack)
if err != nil {
return err
}
return s.next.DeleteStack(ctx, identifiers)
}

func (s *authMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
err := s.authAgent.OrgPermissions(ctx, orgID, influxdb.ReadAction)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkger/service_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ func (s *loggingMW) InitStack(ctx context.Context, userID influxdb.ID, newStack
return s.next.InitStack(ctx, userID, newStack)
}

func (s *loggingMW) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) (err error) {
defer func(start time.Time) {
if err == nil {
return
}

s.logger.Error(
"failed to delete stack",
zap.Error(err),
zap.Stringer("orgID", identifiers.OrgID),
zap.Stringer("userID", identifiers.OrgID),
zap.Stringer("stackID", identifiers.StackID),
zap.Duration("took", time.Since(start)),
)
}(time.Now())
return s.next.DeleteStack(ctx, identifiers)
}

func (s *loggingMW) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) (stacks []Stack, err error) {
defer func(start time.Time) {
if err == nil {
Expand Down
5 changes: 5 additions & 0 deletions pkger/service_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (s *mwMetrics) InitStack(ctx context.Context, userID influxdb.ID, newStack
return stack, rec(err)
}

func (s *mwMetrics) DeleteStack(ctx context.Context, identifiers struct{ OrgID, UserID, StackID influxdb.ID }) error {
rec := s.rec.Record("delete_stack")
return rec(s.next.DeleteStack(ctx, identifiers))
}

func (s *mwMetrics) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFilter) ([]Stack, error) {
rec := s.rec.Record("list_stacks")
stacks, err := s.next.ListStacks(ctx, orgID, f)
Expand Down
Loading

0 comments on commit 7400b5c

Please sign in to comment.