Skip to content

Commit

Permalink
Add stats for current and old WAL segment sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-rushakoff committed Feb 20, 2016
1 parent e76967e commit d99c09c
Showing 1 changed file with 57 additions and 0 deletions.
57 changes: 57 additions & 0 deletions tsdb/engine/tsm1/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tsm1
import (
"encoding/binary"
"errors"
"expvar"
"fmt"
"io"
"log"
Expand All @@ -16,6 +17,7 @@ import (
"time"

"github.com/golang/snappy"
"github.com/influxdata/influxdb"
)

const (
Expand Down Expand Up @@ -54,6 +56,12 @@ const (

var ErrWALClosed = fmt.Errorf("WAL closed")

// Statistics gathered by the WAL.
const (
statWALOldBytes = "oldSegmentsDiskBytes"
statWALCurrentBytes = "currentSegmentDiskBytes"
)

type WAL struct {
mu sync.RWMutex
lastWriteTime time.Time
Expand All @@ -76,6 +84,8 @@ type WAL struct {

// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool

statMap *expvar.Map
}

func NewWAL(path string) *WAL {
Expand All @@ -87,6 +97,8 @@ func NewWAL(path string) *WAL {
SegmentSize: DefaultSegmentSize,
logger: log.New(os.Stderr, "[tsm1wal] ", log.LstdFlags),
closing: make(chan struct{}),

statMap: influxdb.NewStatistics("tsm1_wal:"+path, "tsm1_wal", map[string]string{"path": path}),
}
}

Expand Down Expand Up @@ -130,12 +142,26 @@ func (l *WAL) Open() error {

if stat.Size() == 0 {
os.Remove(lastSegment)
segments = segments[:len(segments)-1]
}
if err := l.newSegmentFile(); err != nil {
return err
}
}

var totalOldDiskSize int64
for _, seg := range segments {
stat, err := os.Stat(seg)
if err != nil {
return err
}

totalOldDiskSize += stat.Size()
}
sizeStat := new(expvar.Int)
sizeStat.Set(totalOldDiskSize)
l.statMap.Set(statWALOldBytes, sizeStat)

l.closing = make(chan struct{})

l.lastWriteTime = time.Now()
Expand Down Expand Up @@ -196,6 +222,26 @@ func (l *WAL) Remove(files []string) error {
for _, fn := range files {
os.RemoveAll(fn)
}

// Refresh the on-disk size stats
segments, err := segmentFileNames(l.path)
if err != nil {
return err
}

var totalOldDiskSize int64
for _, seg := range segments {
stat, err := os.Stat(seg)
if err != nil {
return err
}

totalOldDiskSize += stat.Size()
}
sizeStat := new(expvar.Int)
sizeStat.Set(totalOldDiskSize)
l.statMap.Set(statWALOldBytes, sizeStat)

return nil
}

Expand Down Expand Up @@ -240,6 +286,11 @@ func (l *WAL) writeToLog(entry WALEntry) (int, error) {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}

// Update stats for current segment size
curSize := new(expvar.Int)
curSize.Set(int64(l.currentSegmentWriter.size))
l.statMap.Set(statWALCurrentBytes, curSize)

l.lastWriteTime = time.Now()

return l.currentSegmentID, l.currentSegmentWriter.sync()
Expand Down Expand Up @@ -324,6 +375,7 @@ func (l *WAL) newSegmentFile() error {
if err := l.currentSegmentWriter.close(); err != nil {
return err
}
l.statMap.Add(statWALOldBytes, int64(l.currentSegmentWriter.size))
}

fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
Expand All @@ -333,6 +385,11 @@ func (l *WAL) newSegmentFile() error {
}
l.currentSegmentWriter = NewWALSegmentWriter(fd)

// Reset the current segment size stat
curSize := new(expvar.Int)
curSize.Set(0)
l.statMap.Set(statWALCurrentBytes, curSize)

return nil
}

Expand Down

0 comments on commit d99c09c

Please sign in to comment.