diff --git a/CHANGELOG.md b/CHANGELOG.md index f8966298aaa..0b346b4a9b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ - [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup - [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped. - [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour +- [#5844](https://github.com/influxdata/influxdb/pull/5844): Tag TSM engine stats with database and retention policy ### Bugfixes diff --git a/services/copier/service_test.go b/services/copier/service_test.go index e4f5e2bb969..a68a413b87b 100644 --- a/services/copier/service_test.go +++ b/services/copier/service_test.go @@ -164,10 +164,8 @@ func MustOpenShard(id uint64) *Shard { sh := &Shard{ Shard: tsdb.NewShard(id, tsdb.NewDatabaseIndex("db"), - tsdb.ShardConfig{ - Path: filepath.Join(path, "data"), - WALPath: filepath.Join(path, "wal"), - }, + filepath.Join(path, "data"), + filepath.Join(path, "wal"), tsdb.NewEngineOptions(), ), path: path, diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index fa096334eed..cd4d270c40e 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -10,6 +10,7 @@ import ( "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/tsdb" ) var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded") @@ -113,10 +114,15 @@ type Cache struct { // NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. // Only used for engine caches, never for snapshots func NewCache(maxSize uint64, path string) *Cache { + db, rp := tsdb.DecodeStorePath(path) c := &Cache{ - maxSize: maxSize, - store: make(map[string]*entry), - statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}), + maxSize: maxSize, + store: make(map[string]*entry), + statMap: influxdb.NewStatistics( + "tsm1_cache:"+path, + "tsm1_cache", + map[string]string{"path": path, "database": db, "retentionPolicy": rp}, + ), lastSnapshot: time.Now(), } c.UpdateAge() diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 616535f0f39..612ed56c92d 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -13,6 +13,7 @@ import ( "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/tsdb" ) type TSMFile interface { @@ -124,11 +125,16 @@ func (f FileStat) ContainsKey(key string) bool { } func NewFileStore(dir string) *FileStore { + db, rp := tsdb.DecodeStorePath(dir) return &FileStore{ dir: dir, lastModified: time.Now(), Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags), - statMap: influxdb.NewStatistics("tsm1_filestore:"+dir, "tsm1_filestore", map[string]string{"path": dir}), + statMap: influxdb.NewStatistics( + "tsm1_filestore:"+dir, + "tsm1_filestore", + map[string]string{"path": dir, "database": db, "retentionPolicy": rp}, + ), } } diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 9fc63269ba8..2581b50b26a 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -18,6 +18,7 @@ import ( "github.com/golang/snappy" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/tsdb" ) const ( @@ -89,6 +90,7 @@ type WAL struct { } func NewWAL(path string) *WAL { + db, rp := tsdb.DecodeStorePath(path) return &WAL{ path: path, @@ -98,7 +100,11 @@ func NewWAL(path string) *WAL { logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags), closing: make(chan struct{}), - statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}), + statMap: influxdb.NewStatistics( + "tsm1_wal:"+path, + "tsm1_wal", + map[string]string{"path": path, "database": db, "retentionPolicy": rp}, + ), } } diff --git a/tsdb/shard.go b/tsdb/shard.go index f1e637b440d..6edbd06262a 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -48,10 +48,13 @@ var ( // Data can be split across many shards. The query engine in TSDB is responsible // for combining the output of many shards into a single query result. type Shard struct { - index *DatabaseIndex - id uint64 + index *DatabaseIndex + path string + walPath string + id uint64 - config ShardConfig + database string + retentionPolicy string engine Engine options EngineOptions @@ -66,49 +69,38 @@ type Shard struct { LogOutput io.Writer } -// ShardConfig is passed to NewShard to specify the shard's -// database, retention policy, and location of files on disk. -type ShardConfig struct { - // Name of the database this shard belongs to - Database string - - // Name of the retention policy this shard belongs to - RetentionPolicy string - - // Path to this shard's location on disk - Path string - - // Path to this shard's WAL location - WALPath string -} - // NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index -func NewShard(id uint64, index *DatabaseIndex, config ShardConfig, options EngineOptions) *Shard { +func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard { // Configure statistics collection. - key := fmt.Sprintf("shard:%s:%d", config.Path, id) + key := fmt.Sprintf("shard:%s:%d", path, id) + db, rp := DecodeStorePath(path) tags := map[string]string{ - "path": config.Path, + "path": path, "id": fmt.Sprintf("%d", id), "engine": options.EngineVersion, - "database": config.Database, - "retentionPolicy": config.RetentionPolicy, + "database": db, + "retentionPolicy": rp, } statMap := influxdb.NewStatistics(key, "shard", tags) return &Shard{ index: index, id: id, - config: config, + path: path, + walPath: walPath, options: options, measurementFields: make(map[string]*MeasurementFields), + database: db, + retentionPolicy: rp, + statMap: statMap, LogOutput: os.Stderr, } } // Path returns the path set on the shard when it was created. -func (s *Shard) Path() string { return s.config.Path } +func (s *Shard) Path() string { return s.path } // PerformMaintenance gets called periodically to have the engine perform // any maintenance tasks like WAL flushing and compaction @@ -131,7 +123,7 @@ func (s *Shard) Open() error { } // Initialize underlying engine. - e, err := NewEngine(s.config.Path, s.config.WALPath, s.options) + e, err := NewEngine(s.path, s.walPath, s.options) if err != nil { return fmt.Errorf("new engine: %s", err) } @@ -175,9 +167,7 @@ func (s *Shard) close() error { // DiskSize returns the size on disk of this shard func (s *Shard) DiskSize() (int64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - stats, err := os.Stat(s.config.Path) + stats, err := os.Stat(s.path) if err != nil { return 0, err } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index db78179426d..ac119bfe8ca 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -33,7 +33,7 @@ func TestShardWriteAndIndex(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -76,7 +76,7 @@ func TestShardWriteAndIndex(t *testing.T) { sh.Close() index = tsdb.NewDatabaseIndex("db") - sh = tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) + sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -103,7 +103,7 @@ func TestShardWriteAddNewField(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") - sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts) + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) if err := sh.Open(); err != nil { t.Fatalf("error opening shard: %s", err.Error()) } @@ -258,7 +258,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) { tmpDir, _ := ioutil.TempDir("", "shard_test") tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) shard.Open() b.StartTimer() @@ -294,7 +294,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt defer os.RemoveAll(tmpDir) tmpShard := path.Join(tmpDir, "shard") tmpWal := path.Join(tmpDir, "wal") - shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions()) + shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions()) shard.Open() defer shard.Close() chunkedWrite(shard, points) @@ -356,10 +356,8 @@ func NewShard() *Shard { return &Shard{ Shard: tsdb.NewShard(0, tsdb.NewDatabaseIndex("db"), - tsdb.ShardConfig{ - Path: filepath.Join(path, "data"), - WALPath: filepath.Join(path, "wal"), - }, + filepath.Join(path, "data"), + filepath.Join(path, "wal"), opt, ), path: path, diff --git a/tsdb/store.go b/tsdb/store.go index 7b0a75c2097..032f4c4e786 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -141,13 +141,7 @@ func (s *Store) loadShards() error { continue } - sc := ShardConfig{ - Path: path, - WALPath: walPath, - Database: db, - RetentionPolicy: rp.Name(), - } - shard := NewShard(shardID, s.databaseIndexes[db], sc, s.EngineOptions) + shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions) err = shard.Open() if err != nil { return fmt.Errorf("failed to open shard %d: %s", shardID, err) @@ -258,13 +252,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er s.databaseIndexes[database] = db } - sc := ShardConfig{ - Path: filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)), - WALPath: walPath, - Database: database, - RetentionPolicy: retentionPolicy, - } - shard := NewShard(shardID, db, sc, s.EngineOptions) + path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) + shard := NewShard(shardID, db, path, walPath, s.EngineOptions) if err := shard.Open(); err != nil { return err } @@ -294,11 +283,11 @@ func (s *Store) deleteShard(shardID uint64) error { return err } - if err := os.RemoveAll(sh.config.Path); err != nil { + if err := os.RemoveAll(sh.path); err != nil { return err } - if err := os.RemoveAll(sh.config.WALPath); err != nil { + if err := os.RemoveAll(sh.walPath); err != nil { return err } @@ -322,7 +311,7 @@ func (s *Store) DeleteDatabase(name string) error { // Close and delete all shards on the database. for shardID, sh := range s.shards { - if sh.config.Database == name { + if sh.database == name { // Delete the shard from disk. if err := s.deleteShard(shardID); err != nil { return err @@ -351,7 +340,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error { // Close and delete all shards under the retention policy on the // database. for shardID, sh := range s.shards { - if sh.config.Database == database && sh.config.RetentionPolicy == name { + if sh.database == database && sh.retentionPolicy == name { // Delete the shard from disk. if err := s.deleteShard(shardID); err != nil { return err @@ -390,7 +379,7 @@ func (s *Store) DeleteMeasurement(database, name string) error { // Remove underlying data. for _, sh := range s.shards { - if sh.config.Database != database { + if sh.database != database { continue } @@ -479,7 +468,7 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error { return fmt.Errorf("shard %d doesn't exist on this server", id) } - path, err := relativePath(s.path, shard.config.Path) + path, err := relativePath(s.path, shard.path) if err != nil { return err } @@ -493,7 +482,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { if shard == nil { return "", fmt.Errorf("shard %d doesn't exist on this server", id) } - return relativePath(s.path, shard.config.Path) + return relativePath(s.path, shard.path) } // DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys @@ -568,7 +557,7 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error { } for _, sh := range s.shards { - if sh.config.Database != database { + if sh.database != database { continue } if err := sh.DeleteSeries(seriesKeys); err != nil { @@ -957,6 +946,20 @@ func IsRetryable(err error) bool { return true } +// DecodeStorePath extracts the database and retention policy names +// from a given shard or WAL path. +func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) { + // shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL + + // Discard the last part of the path (the shard name or the wal name). + path, _ := filepath.Split(filepath.Clean(shardOrWALPath)) + + // Extract the database and retention policy. + path, rp := filepath.Split(filepath.Clean(path)) + _, db := filepath.Split(filepath.Clean(path)) + return db, rp +} + // relativePath will expand out the full paths passed in and return // the relative shard path from the store func relativePath(storePath, shardPath string) (string, error) { diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 97c45bafb50..a9b61977c49 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -344,6 +344,7 @@ func (s *Store) Reopen() error { return err } s.Store = tsdb.NewStore(s.Path()) + s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal") return s.Open() }