Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tag TSM stats with database and retention policy #5844

Merged
merged 1 commit into from
Feb 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup
- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped.
- [#5758](https://github.com/influxdata/influxdb/pull/5758): TSM engine stats for cache, WAL, and filestore. Thanks @jonseymour
- [#5844](https://github.com/influxdata/influxdb/pull/5844): Tag TSM engine stats with database and retention policy

### Bugfixes

Expand Down
6 changes: 2 additions & 4 deletions services/copier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,8 @@ func MustOpenShard(id uint64) *Shard {
sh := &Shard{
Shard: tsdb.NewShard(id,
tsdb.NewDatabaseIndex("db"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
tsdb.NewEngineOptions(),
),
path: path,
Expand Down
12 changes: 9 additions & 3 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)

var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
Expand Down Expand Up @@ -113,10 +114,15 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
// Only used for engine caches, never for snapshots
func NewCache(maxSize uint64, path string) *Cache {
db, rp := tsdb.DecodeStorePath(path)
c := &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics(
"tsm1_cache:"+path,
"tsm1_cache",
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
),
lastSnapshot: time.Now(),
}
c.UpdateAge()
Expand Down
8 changes: 7 additions & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)

type TSMFile interface {
Expand Down Expand Up @@ -124,11 +125,16 @@ func (f FileStat) ContainsKey(key string) bool {
}

func NewFileStore(dir string) *FileStore {
db, rp := tsdb.DecodeStorePath(dir)
return &FileStore{
dir: dir,
lastModified: time.Now(),
Logger: log.New(os.Stderr, "[filestore] ", log.LstdFlags),
statMap: influxdb.NewStatistics("tsm1_filestore:"+dir, "tsm1_filestore", map[string]string{"path": dir}),
statMap: influxdb.NewStatistics(
"tsm1_filestore:"+dir,
"tsm1_filestore",
map[string]string{"path": dir, "database": db, "retentionPolicy": rp},
),
}
}

Expand Down
8 changes: 7 additions & 1 deletion tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/golang/snappy"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb"
)

const (
Expand Down Expand Up @@ -89,6 +90,7 @@ type WAL struct {
}

func NewWAL(path string) *WAL {
db, rp := tsdb.DecodeStorePath(path)
return &WAL{
path: path,

Expand All @@ -98,7 +100,11 @@ func NewWAL(path string) *WAL {
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),

statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}),
statMap: influxdb.NewStatistics(
"tsm1_wal:"+path,
"tsm1_wal",
map[string]string{"path": path, "database": db, "retentionPolicy": rp},
),
}
}

Expand Down
50 changes: 20 additions & 30 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ var (
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
index *DatabaseIndex
id uint64
index *DatabaseIndex
path string
walPath string
id uint64

config ShardConfig
database string
retentionPolicy string

engine Engine
options EngineOptions
Expand All @@ -66,49 +69,38 @@ type Shard struct {
LogOutput io.Writer
}

// ShardConfig is passed to NewShard to specify the shard's
// database, retention policy, and location of files on disk.
type ShardConfig struct {
// Name of the database this shard belongs to
Database string

// Name of the retention policy this shard belongs to
RetentionPolicy string

// Path to this shard's location on disk
Path string

// Path to this shard's WAL location
WALPath string
}

// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, index *DatabaseIndex, config ShardConfig, options EngineOptions) *Shard {
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
// Configure statistics collection.
key := fmt.Sprintf("shard:%s:%d", config.Path, id)
key := fmt.Sprintf("shard:%s:%d", path, id)
db, rp := DecodeStorePath(path)
tags := map[string]string{
"path": config.Path,
"path": path,
"id": fmt.Sprintf("%d", id),
"engine": options.EngineVersion,
"database": config.Database,
"retentionPolicy": config.RetentionPolicy,
"database": db,
"retentionPolicy": rp,
}
statMap := influxdb.NewStatistics(key, "shard", tags)

return &Shard{
index: index,
id: id,
config: config,
path: path,
walPath: walPath,
options: options,
measurementFields: make(map[string]*MeasurementFields),

database: db,
retentionPolicy: rp,

statMap: statMap,
LogOutput: os.Stderr,
}
}

// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.config.Path }
func (s *Shard) Path() string { return s.path }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this doesn't need a read lock on s.mu, I think it should either have one, or the one in DiskSize should be removed. There are no concurrent writes to s.path, so technically no lock is currently needed, but it would be better to be consistent either way..

If it's decided to add a read lock to Path() then you should also move path under mu in the Shard type definition.


// PerformMaintenance gets called periodically to have the engine perform
// any maintenance tasks like WAL flushing and compaction
Expand All @@ -131,7 +123,7 @@ func (s *Shard) Open() error {
}

// Initialize underlying engine.
e, err := NewEngine(s.config.Path, s.config.WALPath, s.options)
e, err := NewEngine(s.path, s.walPath, s.options)
if err != nil {
return fmt.Errorf("new engine: %s", err)
}
Expand Down Expand Up @@ -175,9 +167,7 @@ func (s *Shard) close() error {

// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stats, err := os.Stat(s.config.Path)
stats, err := os.Stat(s.path)
if err != nil {
return 0, err
}
Expand Down
16 changes: 7 additions & 9 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestShardWriteAndIndex(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestShardWriteAndIndex(t *testing.T) {
sh.Close()

index = tsdb.NewDatabaseIndex("db")
sh = tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh = tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
Expand All @@ -103,7 +103,7 @@ func TestShardWriteAddNewField(t *testing.T) {
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")

sh := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, opts)
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func benchmarkWritePoints(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt int) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()

b.StartTimer()
Expand Down Expand Up @@ -294,7 +294,7 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
shard := tsdb.NewShard(1, index, tsdb.ShardConfig{Path: tmpShard, WALPath: tmpWal}, tsdb.NewEngineOptions())
shard := tsdb.NewShard(1, index, tmpShard, tmpWal, tsdb.NewEngineOptions())
shard.Open()
defer shard.Close()
chunkedWrite(shard, points)
Expand Down Expand Up @@ -356,10 +356,8 @@ func NewShard() *Shard {
return &Shard{
Shard: tsdb.NewShard(0,
tsdb.NewDatabaseIndex("db"),
tsdb.ShardConfig{
Path: filepath.Join(path, "data"),
WALPath: filepath.Join(path, "wal"),
},
filepath.Join(path, "data"),
filepath.Join(path, "wal"),
opt,
),
path: path,
Expand Down
47 changes: 25 additions & 22 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,7 @@ func (s *Store) loadShards() error {
continue
}

sc := ShardConfig{
Path: path,
WALPath: walPath,
Database: db,
RetentionPolicy: rp.Name(),
}
shard := NewShard(shardID, s.databaseIndexes[db], sc, s.EngineOptions)
shard := NewShard(shardID, s.databaseIndexes[db], path, walPath, s.EngineOptions)
err = shard.Open()
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
Expand Down Expand Up @@ -258,13 +252,8 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
s.databaseIndexes[database] = db
}

sc := ShardConfig{
Path: filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)),
WALPath: walPath,
Database: database,
RetentionPolicy: retentionPolicy,
}
shard := NewShard(shardID, db, sc, s.EngineOptions)
path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10))
shard := NewShard(shardID, db, path, walPath, s.EngineOptions)
if err := shard.Open(); err != nil {
return err
}
Expand Down Expand Up @@ -294,11 +283,11 @@ func (s *Store) deleteShard(shardID uint64) error {
return err
}

if err := os.RemoveAll(sh.config.Path); err != nil {
if err := os.RemoveAll(sh.path); err != nil {
return err
}

if err := os.RemoveAll(sh.config.WALPath); err != nil {
if err := os.RemoveAll(sh.walPath); err != nil {
return err
}

Expand All @@ -322,7 +311,7 @@ func (s *Store) DeleteDatabase(name string) error {

// Close and delete all shards on the database.
for shardID, sh := range s.shards {
if sh.config.Database == name {
if sh.database == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
Expand Down Expand Up @@ -351,7 +340,7 @@ func (s *Store) DeleteRetentionPolicy(database, name string) error {
// Close and delete all shards under the retention policy on the
// database.
for shardID, sh := range s.shards {
if sh.config.Database == database && sh.config.RetentionPolicy == name {
if sh.database == database && sh.retentionPolicy == name {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
Expand Down Expand Up @@ -390,7 +379,7 @@ func (s *Store) DeleteMeasurement(database, name string) error {

// Remove underlying data.
for _, sh := range s.shards {
if sh.config.Database != database {
if sh.database != database {
continue
}

Expand Down Expand Up @@ -479,7 +468,7 @@ func (s *Store) BackupShard(id uint64, since time.Time, w io.Writer) error {
return fmt.Errorf("shard %d doesn't exist on this server", id)
}

path, err := relativePath(s.path, shard.config.Path)
path, err := relativePath(s.path, shard.path)
if err != nil {
return err
}
Expand All @@ -493,7 +482,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
if shard == nil {
return "", fmt.Errorf("shard %d doesn't exist on this server", id)
}
return relativePath(s.path, shard.config.Path)
return relativePath(s.path, shard.path)
}

// DeleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
Expand Down Expand Up @@ -568,7 +557,7 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error {
}

for _, sh := range s.shards {
if sh.config.Database != database {
if sh.database != database {
continue
}
if err := sh.DeleteSeries(seriesKeys); err != nil {
Expand Down Expand Up @@ -957,6 +946,20 @@ func IsRetryable(err error) bool {
return true
}

// DecodeStorePath extracts the database and retention policy names
// from a given shard or WAL path.
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm overlooking something here, but why can't you just split on / and take the correct elements from the resulting []string?

parts := strings.Split(pth, "/")
// check parts is of appropriate length.
return parts[len(parts)-3], parts[len(parts)-2]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed filepath was better suited to the job since it handles any unusual cases involving a doubled-up separator, e.g. /some/path//db/rp/.... Maybe it's not possible to hit that situation in our code paths, but I chose to err on the side of being defensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea I had behind DecodeStorePath (DecodeThingy) is that it describes how to go from a path to db, rp, and potentially sID. If we change how we store any of these elements then this function and the alternative EncodeStorePath should be modified to make that happen.

I guess the issue here is that you haven't implemented the encode version of this function so I can understand your point about being defensive (someone might update how we store dbs/rps and not update this function).

Either way, I can't think of a case where your current approach wouldn't also break if things were changed.

// shardOrWALPath format: /maybe/absolute/base/then/:database/:retentionPolicy/:nameOfShardOrWAL

// Discard the last part of the path (the shard name or the wal name).
path, _ := filepath.Split(filepath.Clean(shardOrWALPath))

// Extract the database and retention policy.
path, rp := filepath.Split(filepath.Clean(path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably not necessary to call clean multiple times. Should you just use filepath.SplitList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary. See golang/go#9928 for a slightly more detailed discussion.

filepath.SplitList is meant to parse out colon separated list of paths like you would see in your $PATH environment variable.

_, db := filepath.Split(filepath.Clean(path))
return db, rp
}

// relativePath will expand out the full paths passed in and return
// the relative shard path from the store
func relativePath(storePath, shardPath string) (string, error) {
Expand Down
1 change: 1 addition & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (s *Store) Reopen() error {
return err
}
s.Store = tsdb.NewStore(s.Path())
s.EngineOptions.Config.WALDir = filepath.Join(s.Path(), "wal")
return s.Open()
}

Expand Down