Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsm: cache: add cache throughput related statistics. #5762

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"sort"
"sync"
"time"

"github.com/influxdata/influxdb"
)
Expand Down Expand Up @@ -67,8 +68,17 @@ func (e *entry) deduplicate() {

// Statistics gathered by the Cache.
const (
statCacheMemoryBytes = "memBytes" // Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // Size of on-disk snapshots in bytes
// levels - point in time measures

statCacheMemoryBytes = "memBytes" // level: Size of in-memory cache in bytes
statCacheDiskBytes = "diskBytes" // level: Size of on-disk snapshots in bytes
statSnapshots = "snapshotCount" // level: Number of active snapshots.
statCacheAgeMs = "cacheAgeMs" // level: Number of milliseconds since cache was last snapshoted at sample time

// counters - accumulative measures

statCachedBytes = "cachedBytes" // counter: Total number of bytes written into snapshots.
statWALCompactionTimeMs = "WALCompactionTimeMs" // counter: Total number of milliseconds spent compacting snapshots
)

// Cache maintains an in-memory store of Values for a set of keys.
Expand All @@ -84,7 +94,8 @@ type Cache struct {
snapshots []*Cache
snapshotsSize uint64

statMap *expvar.Map
statMap *expvar.Map
lastSnapshot time.Time

// path is only used to track stats
path string
Expand All @@ -93,10 +104,11 @@ type Cache struct {
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64, path string) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
path: path,
maxSize: maxSize,
store: make(map[string]*entry),
statMap: influxdb.NewStatistics("tsm1_cache:"+path, "tsm1_cache", map[string]string{"path": path}),
path: path,
lastSnapshot: time.Now(),
}
}

Expand Down Expand Up @@ -167,6 +179,7 @@ func (c *Cache) Snapshot() *Cache {

c.store = make(map[string]*entry)
c.size = 0
c.lastSnapshot = time.Now()

c.snapshots = append(c.snapshots, snapshot)
c.snapshotsSize += snapshot.size
Expand All @@ -180,6 +193,9 @@ func (c *Cache) Snapshot() *Cache {
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

c.statMap.Add(statCachedBytes, int64(snapshot.size))
c.statMap.Add(statSnapshots, 1)

return snapshot
}

Expand Down Expand Up @@ -209,6 +225,8 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
diskSizeStat := new(expvar.Int)
diskSizeStat.Set(int64(c.snapshotsSize))
c.statMap.Set(statCacheDiskBytes, diskSizeStat)

c.statMap.Add(statSnapshots, -1)
}

// Size returns the number of point-calcuated bytes the cache currently uses.
Expand Down Expand Up @@ -425,3 +443,17 @@ func (cl *CacheLoader) Load(cache *Cache) error {
}
return nil
}

// Updates the age statistic
func (c *Cache) UpdateAge() {
c.mu.RLock()
defer c.mu.RUnlock()
ageStat := new(expvar.Int)
ageStat.Set(int64(time.Now().Sub(c.lastSnapshot) / time.Millisecond))
c.statMap.Set(statCacheAgeMs, ageStat)
}

// Updates WAL compaction time statistic
func (c *Cache) UpdateCompactTime(d time.Duration) {
c.statMap.Add(statWALCompactionTimeMs, int64(d/time.Millisecond))
}
13 changes: 13 additions & 0 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,22 @@ func (e *Engine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemen
func (e *Engine) WriteSnapshot() error {
// Lock and grab the cache snapshot along with all the closed WAL
// filenames associated with the snapshot

var started *time.Time

defer func() {
if started != nil {
e.Cache.UpdateCompactTime(time.Now().Sub(*started))
}
}()

closedFiles, snapshot, compactor, err := func() ([]string, *Cache, *Compactor, error) {
e.mu.Lock()
defer e.mu.Unlock()

now := time.Now()
started = &now

if err := e.WAL.CloseSegment(); err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -477,6 +489,7 @@ func (e *Engine) compactCache() {
return

default:
e.Cache.UpdateAge()
if e.ShouldCompactCache(e.WAL.LastWriteTime()) {
err := e.WriteSnapshot()
if err != nil {
Expand Down