Skip to content

Commit

Permalink
Memoize output of FileStore.Stats
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard authored and jwilder committed Oct 24, 2016
1 parent a16c188 commit 0ee093f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series.
- [#7480](https://github.com/influxdata/influxdb/pull/7480): Improve compaction planning performance by caching tsm file stats.

### Bugfixes

Expand Down
31 changes: 26 additions & 5 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ const (
type FileStore struct {
mu sync.RWMutex
lastModified time.Time
// Most recently known file stats. If nil then stats will need to be
// recalculated
lastFileStats []FileStat

currentGeneration int
dir string
Expand Down Expand Up @@ -258,6 +261,7 @@ func (f *FileStore) Add(files ...TSMFile) {
for _, file := range files {
atomic.AddInt64(&f.stats.DiskBytes, int64(file.Size()))
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = append(f.files, files...)
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down Expand Up @@ -285,6 +289,7 @@ func (f *FileStore) Remove(paths ...string) {
atomic.AddInt64(&f.stats.DiskBytes, -int64(file.Size()))
}
}
f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down Expand Up @@ -449,6 +454,7 @@ func (f *FileStore) Close() error {
file.Close()
}

f.lastFileStats = nil
f.files = nil
atomic.StoreInt64(&f.stats.FileCount, 0)
return nil
Expand Down Expand Up @@ -485,13 +491,27 @@ func (f *FileStore) KeyCursor(key string, t int64, ascending bool) *KeyCursor {

func (f *FileStore) Stats() []FileStat {
f.mu.RLock()
defer f.mu.RUnlock()
stats := make([]FileStat, len(f.files))
for i, fd := range f.files {
stats[i] = fd.Stats()
if len(f.lastFileStats) > 0 {
defer f.mu.RUnlock()
return f.lastFileStats
}
f.mu.RUnlock()

return stats
// The file stats cache is invalid due to changes to files. Need to
// recalculate.
f.mu.Lock()

// If lastFileStats's capacity is far away from the number of entries
// we need to add, then we'll reallocate.
if cap(f.lastFileStats) < len(f.files)/2 {
f.lastFileStats = make([]FileStat, 0, len(f.files))
}

for _, fd := range f.files {
f.lastFileStats = append(f.lastFileStats, fd.Stats())
}
defer f.mu.Unlock()
return f.lastFileStats
}

func (f *FileStore) Replace(oldFiles, newFiles []string) error {
Expand Down Expand Up @@ -598,6 +618,7 @@ func (f *FileStore) Replace(oldFiles, newFiles []string) error {
// Tell the purger about our in-use files we need to remove
f.purger.add(inuse)

f.lastFileStats = f.lastFileStats[:0] // Will need to be recalculated on next call to Stats.
f.files = active
sort.Sort(tsmReaders(f.files))
atomic.StoreInt64(&f.stats.FileCount, int64(len(f.files)))
Expand Down
95 changes: 94 additions & 1 deletion tsdb/engine/tsm1/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -2188,7 +2190,7 @@ func TestFileStore_Stats(t *testing.T) {
keyValues{"mem", []tsm1.Value{tsm1.NewValue(0, 1.0)}},
}

_, err := newFileDir(dir, data...)
files, err := newFileDir(dir, data...)
if err != nil {
fatal(t, "creating test files", err)
}
Expand All @@ -2203,6 +2205,63 @@ func TestFileStore_Stats(t *testing.T) {
if got, exp := len(stats), 3; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}

// Another call should result in the same stats being returned.
if got, exp := fs.Stats(), stats; !reflect.DeepEqual(got, exp) {
t.Fatalf("got %v, exp %v", got, exp)
}

// Removing one of the files should invalidate the cache.
fs.Remove(files[0])
if got, exp := len(fs.Stats()), 2; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}

// Write a new TSM file that that is not open
newFile := MustWriteTSM(dir, 4, map[string][]tsm1.Value{
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})

replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension
if err := os.Rename(newFile, replacement); err != nil {

}
// Replace 3 w/ 1
if err := fs.Replace(files, []string{replacement}); err != nil {
t.Fatalf("replace: %v", err)
}

var found bool
stats = fs.Stats()
for _, stat := range stats {
if strings.HasSuffix(stat.Path, "-foo") {
found = true
}
}

if !found {
t.Fatalf("Didn't find %s in stats: %v", "foo", stats)
}

newFile = MustWriteTSM(dir, 5, map[string][]tsm1.Value{
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)},
})

fd, err := os.Open(newFile)
if err != nil {
t.Fatalf("open file: %v", err)
}

f, err := tsm1.NewTSMReader(fd)
if err != nil {
t.Fatalf("new reader: %v", err)
}

// Adding some files should invalidate the cache.
fs.Add(f)
if got, exp := len(fs.Stats()), 2; got != exp {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
}
}

func TestFileStore_CreateSnapshot(t *testing.T) {
Expand Down Expand Up @@ -2363,3 +2422,37 @@ func fatal(t *testing.T, msg string, err error) {
func tsmFileName(id int) string {
return fmt.Sprintf("%09d-%09d.tsm", id, 1)
}

var fsResult []tsm1.FileStat

func BenchmarkFileStore_Stats(b *testing.B) {
dir := MustTempDir()
defer os.RemoveAll(dir)

// Create some TSM files...
data := make([]keyValues, 0, 1000)
for i := 0; i < 1000; i++ {
data = append(data, keyValues{"cpu", []tsm1.Value{tsm1.NewValue(0, 1.0)}})
}

_, err := newFileDir(dir, data...)
if err != nil {
b.Fatalf("creating benchmark files %v", err)
}

fs := tsm1.NewFileStore(dir)
if !testing.Verbose() {
fs.SetLogOutput(ioutil.Discard)
}

if err := fs.Open(); err != nil {
b.Fatalf("opening file store %v", err)
}
defer fs.Close()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
fsResult = fs.Stats()
}
}

0 comments on commit 0ee093f

Please sign in to comment.