diff --git a/CHANGELOG.md b/CHANGELOG.md index e6c5ade4987..0b9c4a49160 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,8 @@ - [#5666](https://github.com/influxdata/influxdb/pull/5666): Manage dependencies with gdm - [#5512](https://github.com/influxdata/influxdb/pull/5512): HTTP: Add config option to enable HTTP JSON write path which is now disabled by default. - [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev -- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup -cleanup +- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup +- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped. ### Bugfixes diff --git a/cluster/query_executor.go b/cluster/query_executor.go index bbbdda72e4a..a517ae9563d 100644 --- a/cluster/query_executor.go +++ b/cluster/query_executor.go @@ -293,24 +293,29 @@ func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase return err } - // Retrieve a list of all shard ids. - var shardIDs []uint64 - for _, rp := range dbi.RetentionPolicies { - for _, sg := range rp.ShardGroups { - for _, s := range sg.Shards { - shardIDs = append(shardIDs, s.ID) - } - } - } - // Remove the database from the local store - if err := e.TSDBStore.DeleteDatabase(stmt.Name, shardIDs); err != nil { + if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil { return err } return nil } +// executeDropRetentionPolicy closes all local shards for the retention +// policy and removes the directory. +func (q *QueryExecutor) executeDropRetentionPolicy(stmt *influxql.DropRetentionPolicyStatement) error { + // Check if the database and retention policy exist. + if _, err := q.MetaClient.RetentionPolicy(stmt.Database, stmt.Name); err != nil { + return err + } + + // Remove the retention policy from the local store. + if err := q.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name); err != nil { + return err + } + return q.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name) +} + func (e *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error { return e.TSDBStore.DeleteMeasurement(database, stmt.Name) } diff --git a/services/meta/client.go b/services/meta/client.go index 82fb5a28e53..82f24ad6c8b 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -412,7 +412,7 @@ func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInf // TODO: This should not be handled here if db == nil { - return nil, ErrDatabaseNotExists + return nil, influxdb.ErrDatabaseNotFound(database) } return db.RetentionPolicy(name), nil diff --git a/tsdb/store.go b/tsdb/store.go index 7183ba14d1f..f18ec0b08aa 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -35,8 +35,13 @@ type Store struct { path string databaseIndexes map[string]*DatabaseIndex - // shards is a map of shard IDs to Shards for *ALL DATABASES*. - shards map[uint64]*Shard + + // shardLocations is a map of shard IDs to both the associated + // Shard, and meta information about where the shard is located on + // disk. + // + // shardLocations stores mappings for all shards on all databases. + shardLocations map[uint64]*shardLocation EngineOptions EngineOptions Logger *log.Logger @@ -70,7 +75,7 @@ func (s *Store) Open() error { s.closing = make(chan struct{}) - s.shards = map[uint64]*Shard{} + s.shardLocations = map[uint64]*shardLocation{} s.databaseIndexes = map[string]*DatabaseIndex{} s.Logger.Printf("Using data dir: %v", s.Path()) @@ -136,7 +141,7 @@ func (s *Store) loadShards() error { // Shard file names are numeric shardIDs shardID, err := strconv.ParseUint(sh.Name(), 10, 64) if err != nil { - s.Logger.Printf("Skipping shard: %s. Not a valid path", rp.Name()) + s.Logger.Printf("%s is not a valid ID. Skipping shard.", sh.Name()) continue } @@ -145,7 +150,8 @@ func (s *Store) loadShards() error { if err != nil { return fmt.Errorf("failed to open shard %d: %s", shardID, err) } - s.shards[shardID] = shard + + s.shardLocations[shardID] = &shardLocation{Database: db, RetentionPolicy: rp.Name(), Shard: shard} } } } @@ -164,13 +170,13 @@ func (s *Store) Close() error { } s.wg.Wait() - for _, sh := range s.shards { - if err := sh.Close(); err != nil { + for _, sl := range s.shardLocations { + if err := sl.Shard.Close(); err != nil { return err } } s.opened = false - s.shards = nil + s.shardLocations = nil s.databaseIndexes = nil return nil @@ -187,7 +193,11 @@ func (s *Store) DatabaseIndexN() int { func (s *Store) Shard(id uint64) *Shard { s.mu.RLock() defer s.mu.RUnlock() - return s.shards[id] + sl, ok := s.shardLocations[id] + if !ok { + return nil + } + return sl.Shard } // Shards returns a list of shards by id. @@ -196,11 +206,11 @@ func (s *Store) Shards(ids []uint64) []*Shard { defer s.mu.RUnlock() a := make([]*Shard, 0, len(ids)) for _, id := range ids { - sh := s.shards[id] - if sh == nil { + sl, ok := s.shardLocations[id] + if !ok { continue } - a = append(a, sh) + a = append(a, sl.Shard) } return a } @@ -209,7 +219,7 @@ func (s *Store) Shards(ids []uint64) []*Shard { func (s *Store) ShardN() int { s.mu.RLock() defer s.mu.RUnlock() - return len(s.shards) + return len(s.shardLocations) } // CreateShard creates a shard with the given id and retention policy on a database. @@ -224,7 +234,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er } // shard already exists - if _, ok := s.shards[shardID]; ok { + if _, ok := s.shardLocations[shardID]; ok { return nil } @@ -252,7 +262,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er return err } - s.shards[shardID] = shard + s.shardLocations[shardID] = &shardLocation{Database: database, RetentionPolicy: retentionPolicy, Shard: shard} return nil } @@ -261,41 +271,48 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er func (s *Store) DeleteShard(shardID uint64) error { s.mu.Lock() defer s.mu.Unlock() + return s.deleteShard(shardID) +} +// deleteShard removes a shard from disk. Callers of deleteShard need +// to handle locks appropriately. +func (s *Store) deleteShard(shardID uint64) error { // ensure shard exists - sh, ok := s.shards[shardID] + sl, ok := s.shardLocations[shardID] if !ok { return nil } - if err := sh.Close(); err != nil { + if err := sl.Shard.Close(); err != nil { return err } - if err := os.RemoveAll(sh.path); err != nil { + if err := os.RemoveAll(sl.Shard.path); err != nil { return err } - if err := os.RemoveAll(sh.walPath); err != nil { + if err := os.RemoveAll(sl.Shard.walPath); err != nil { return err } - delete(s.shards, shardID) - + delete(s.shardLocations, shardID) return nil } -// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. -func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error { +// DeleteDatabase will close all shards associated with a database and +// remove the directory and files from disk. +func (s *Store) DeleteDatabase(name string) error { s.mu.Lock() defer s.mu.Unlock() - for _, id := range shardIDs { - shard := s.shards[id] - if shard != nil { - shard.Close() + // Close and delete all shards on the database. + for shardID, location := range s.shardLocations { + if location.IsDatabase(name) { + // Delete the shard from disk. + if err := s.deleteShard(shardID); err != nil { + return err + } } - delete(s.shards, id) } if err := os.RemoveAll(filepath.Join(s.path, name)); err != nil { @@ -306,10 +323,36 @@ func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error { } delete(s.databaseIndexes, name) - return nil } +// DeleteRetentionPolicy will close all shards associated with the +// provided retention policy, remove the retention policy directories on +// both the DB and WAL, and remove all shard files from disk. +func (s *Store) DeleteRetentionPolicy(database, name string) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Close and delete all shards under the retention policy on the + // database. + for shardID, location := range s.shardLocations { + if location.IsDatabase(database) && location.IsRetentionPolicy(name) { + // Delete the shard from disk. + if err := s.deleteShard(shardID); err != nil { + return err + } + } + } + + // Remove the rentention policy folder. + if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil { + return err + } + + // Remove the retention policy folder from the the WAL. + return os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name)) +} + // DeleteMeasurement removes a measurement and all associated series from a database. func (s *Store) DeleteMeasurement(database, name string) error { s.mu.Lock() @@ -331,11 +374,12 @@ func (s *Store) DeleteMeasurement(database, name string) error { db.DropMeasurement(m.Name) // Remove underlying data. - for _, sh := range s.shards { - if sh.index != db { + for _, sl := range s.shardLocations { + if !sl.IsDatabase(database) { continue } - if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil { + + if err := sl.Shard.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil { return err } } @@ -351,8 +395,8 @@ func (s *Store) ShardIDs() []uint64 { } func (s *Store) shardIDs() []uint64 { - a := make([]uint64, 0, len(s.shards)) - for shardID := range s.shards { + a := make([]uint64, 0, len(s.shardLocations)) + for shardID := range s.shardLocations { a = append(a, shardID) } return a @@ -360,9 +404,9 @@ func (s *Store) shardIDs() []uint64 { // shardsSlice returns an ordered list of shards. func (s *Store) shardsSlice() []*Shard { - a := make([]*Shard, 0, len(s.shards)) - for _, sh := range s.shards { - a = append(a, sh) + a := make([]*Shard, 0, len(s.shardLocations)) + for _, sl := range s.shardLocations { + a = append(a, sl.Shard) } sort.Sort(Shards(a)) return a @@ -504,16 +548,15 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi } func (s *Store) deleteSeries(database string, seriesKeys []string) error { - db, ok := s.databaseIndexes[database] - if !ok { + if _, ok := s.databaseIndexes[database]; !ok { return influxql.ErrDatabaseNotFound(database) } - for _, sh := range s.shards { - if sh.index != db { + for _, sl := range s.shardLocations { + if !sl.IsDatabase(database) { continue } - if err := sh.DeleteSeries(seriesKeys); err != nil { + if err := sl.Shard.DeleteSeries(seriesKeys); err != nil { return err } } @@ -535,14 +578,14 @@ func (s *Store) periodicMaintenance() { } } -// performMaintenance will loop through the shars and tell them -// to perform any maintenance tasks. Those tasks should kick off -// their own goroutines if it's anything that could take time. +// performMaintenance loops through shards and executes any maintenance +// tasks. Those tasks should run in their own goroutines if they will +// take significant time. func (s *Store) performMaintenance() { s.mu.Lock() defer s.mu.Unlock() - for _, sh := range s.shards { - s.performMaintenanceOnShard(sh) + for _, sl := range s.shardLocations { + s.performMaintenanceOnShard(sl.Shard) } } @@ -625,12 +668,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error { default: } - sh, ok := s.shards[shardID] + sl, ok := s.shardLocations[shardID] if !ok { return ErrShardNotFound } - return sh.WritePoints(points) + return sl.Shard.WritePoints(points) } func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) { @@ -887,6 +930,30 @@ func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatem return rows, nil } +// shardLocation is a wrapper around a shard that provides extra +// information about which database and retention policy the shard +// belongs to. +// +// shardLocation is safe for use from multiple goroutines. +type shardLocation struct { + mu sync.RWMutex + Database string + RetentionPolicy string + Shard *Shard +} + +func (s *shardLocation) IsDatabase(db string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.Database == db +} + +func (s *shardLocation) IsRetentionPolicy(rp string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.RetentionPolicy == rp +} + // IsRetryable returns true if this error is temporary and could be retried func IsRetryable(err error) bool { if err == nil { diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 9ccab1e7e15..a6ee8f758c6 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -16,6 +16,75 @@ import ( "github.com/influxdata/influxdb/tsdb" ) +// Ensure the store can delete a retention policy and all shards under +// it. +func TestStore_DeleteRetentionPolicy(t *testing.T) { + s := MustOpenStore() + defer s.Close() + + // Create a new shard and verify that it exists. + if err := s.CreateShard("db0", "rp0", 1); err != nil { + t.Fatal(err) + } else if sh := s.Shard(1); sh == nil { + t.Fatalf("expected shard") + } + + // Create a new shard under the same retention policy, and verify + // that it exists. + if err := s.CreateShard("db0", "rp0", 2); err != nil { + t.Fatal(err) + } else if sh := s.Shard(2); sh == nil { + t.Fatalf("expected shard") + } + + // Create a new shard under a different retention policy, and + // verify that it exists. + if err := s.CreateShard("db0", "rp1", 3); err != nil { + t.Fatal(err) + } else if sh := s.Shard(3); sh == nil { + t.Fatalf("expected shard") + } + + // Deleting the rp0 retention policy does not return an error. + if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + // It deletes the shards under that retention policy. + if sh := s.Shard(1); sh != nil { + t.Errorf("shard 1 was not deleted") + } + + if sh := s.Shard(2); sh != nil { + t.Errorf("shard 2 was not deleted") + } + + // It deletes the retention policy directory. + if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp { + t.Error("directory exists, but should have been removed") + } + + // It deletes the WAL retention policy directory. + if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp { + t.Error("directory exists, but should have been removed") + } + + // Reopen other shard and check it still exists. + if err := s.Reopen(); err != nil { + t.Error(err) + } else if sh := s.Shard(3); sh == nil { + t.Errorf("shard 3 does not exist") + } + + // It does not delete other retention policy directories. + if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp { + t.Error("directory does not exist, but should") + } + if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp { + t.Error("directory does not exist, but should") + } +} + // Ensure the store can create a new shard. func TestStore_CreateShard(t *testing.T) { s := MustOpenStore() @@ -38,7 +107,7 @@ func TestStore_CreateShard(t *testing.T) { } // Reopen shard and recheck. - if s, err := ReopenStore(s); err != nil { + if err := s.Reopen(); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("expected shard(1)") @@ -60,7 +129,7 @@ func TestStore_DeleteShard(t *testing.T) { } // Reopen shard and recheck. - if s, err := ReopenStore(s); err != nil { + if err := s.Reopen(); err != nil { t.Fatal(err) } else if sh := s.Shard(1); sh == nil { t.Fatalf("shard exists") @@ -262,19 +331,13 @@ func MustOpenStore() *Store { return s } -// ReopenStore closes and reopens the store as a new store. -func ReopenStore(s *Store) (*Store, error) { +// Reopen closes and reopens the store as a new store. +func (s *Store) Reopen() error { if err := s.Store.Close(); err != nil { - return nil, err - } - - other := &Store{Store: tsdb.NewStore(s.Path())} - other.EngineOptions = s.EngineOptions - if err := other.Open(); err != nil { - return nil, err + return err } - - return other, nil + s.Store = tsdb.NewStore(s.Path()) + return s.Open() } // Close closes the store and removes the underlying data. @@ -341,3 +404,11 @@ func ParseTags(s string) influxql.Tags { } return influxql.NewTags(m) } + +func dirExists(path string) bool { + var err error + if _, err = os.Stat(path); err == nil { + return true + } + return !os.IsNotExist(err) +}