Skip to content

Commit

Permalink
Merge pull request #4072 from influxdb/wal_stats
Browse files Browse the repository at this point in the history
Add stats for the WAL
  • Loading branch information
otoolep committed Sep 10, 2015
2 parents 5086ea4 + 3438bbb commit 6d028f9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#4043](https://github.com/influxdb/influxdb/pull/4043): Add stats and batching to openTSDB input
- [#4042](https://github.com/influxdb/influxdb/pull/4042): Add pending batches control to batcher
- [#4006](https://github.com/influxdb/influxdb/pull/4006): Add basic statistics for shards
- [#4072](https://github.com/influxdb/influxdb/pull/4072): Add statistics for the WAL.

### Bugfixes
- [#4042](https://github.com/influxdb/influxdb/pull/4042): Set UDP input batching defaults as needed.
Expand Down
79 changes: 67 additions & 12 deletions tsdb/engine/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"log"
Expand All @@ -37,6 +38,7 @@ import (
"time"

"github.com/golang/snappy"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -67,6 +69,23 @@ const (
defaultFlushCheckInterval = time.Second
)

// Statistics maintained by the WAL
const (
statPointsWriteReq = "points_write_req"
statPointsWrite = "points_write"
statFlush = "flush"
statAutoFlush = "auto_flush"
statIdleFlush = "idle_flush"
statMetadataFlush = "meta_flush"
statThresholdFlush = "threshold_flush"
statMemoryFlush = "mem_flush"
statSeriesFlushed = "series_flush"
statPointsFlushed = "points_flush"
statFlushDuration = "flush_duration"
statWriteFail = "write_fail"
statMemorySize = "mem_size"
)

// flushType indiciates why a flush and compaction are being run so the partition can
// do the appropriate type of compaction
type flushType int
Expand Down Expand Up @@ -149,6 +168,9 @@ type Log struct {

// LoggingEnabled specifies if detailed logs should be output
LoggingEnabled bool

// expvar-based statistics
statMap *expvar.Map
}

// IndexWriter is an interface for the indexed database the WAL flushes data to
Expand All @@ -160,6 +182,11 @@ type IndexWriter interface {
}

func NewLog(path string) *Log {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"wal", path}, ":")
tags := map[string]string{"bind": path}

return &Log{
path: path,
flush: make(chan int, 1),
Expand All @@ -174,6 +201,7 @@ func NewLog(path string) *Log {
ReadySeriesSize: tsdb.DefaultReadySeriesSize,
flushCheckInterval: defaultFlushCheckInterval,
logger: log.New(os.Stderr, "[wal] ", log.LstdFlags),
statMap: influxdb.NewStatistics(key, "wal", tags),
}
}

Expand Down Expand Up @@ -234,6 +262,9 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
}

func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
l.statMap.Add(statPointsWriteReq, 1)
l.statMap.Add(statPointsWrite, int64(len(points)))

// persist the series and fields if there are any
if err := l.writeSeriesAndFields(fields, series); err != nil {
l.logger.Println("error writing series and fields:", err.Error())
Expand All @@ -246,6 +277,7 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme

// Flush will force a flush on all paritions
func (l *Log) Flush() error {
l.statMap.Add(statFlush, 1)
l.mu.RLock()
defer l.mu.RUnlock()

Expand Down Expand Up @@ -549,6 +581,7 @@ func (l *Log) close() error {

// triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction
func (l *Log) triggerAutoFlush() {
l.statMap.Add(statAutoFlush, 1)
l.mu.RLock()
defer l.mu.RUnlock()

Expand Down Expand Up @@ -591,6 +624,8 @@ func (l *Log) autoflusher(closing chan struct{}) {
// metadata from previous files to the index. After a sucessful write, the metadata files
// will be removed. While the flush to index is happening we aren't blocked for new metadata writes.
func (l *Log) flushMetadata() error {
l.statMap.Add(statMetadataFlush, 1)

// make sure there's actually something in the metadata file to flush
size, err := func() (int64, error) {
l.mu.Lock()
Expand Down Expand Up @@ -699,7 +734,8 @@ type Partition struct {
flushColdInterval time.Duration
lastWriteTime time.Time

log *Log
log *Log
statMap *expvar.Map

// Used for mocking OS calls
os struct {
Expand All @@ -718,6 +754,11 @@ type Partition struct {
const partitionBufLen = 16 << 10 // 16kb

func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) {
// Configure expvar monitoring. It's OK to do this even if the service fails to open and
// should be done before any data could arrive for the service.
key := strings.Join([]string{"partition", strconv.Itoa(int(id)), path}, ":")
tags := map[string]string{"partition": path, "id": strconv.Itoa(int(id))}

p := &Partition{
id: id,
path: path,
Expand All @@ -728,6 +769,7 @@ func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64
readySeriesSize: readySeriesSize,
index: index,
flushColdInterval: flushColdInterval,
statMap: influxdb.NewStatistics(key, "partition", tags),
}

p.os.OpenCompactionFile = os.OpenFile
Expand Down Expand Up @@ -779,6 +821,7 @@ func (p *Partition) Write(points []tsdb.Point) error {
}(); shouldCompact {
go p.flushAndCompact(memoryFlush)
} else if shouldFailWrite {
p.statMap.Add(statWriteFail, 1)
return fmt.Errorf("write throughput too high. backoff and retry")
}
p.mu.Lock()
Expand Down Expand Up @@ -963,35 +1006,46 @@ func (p *Partition) flushAndCompact(flush flushType) error {
return nil
}

// Logging and stats.
ftype := "idle"
ftypeStat := statIdleFlush
if flush == thresholdFlush {
ftype = "threshold"
ftypeStat = statThresholdFlush
} else if flush == memoryFlush {
ftype = "memory"
ftypeStat = statMemoryFlush
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
}
if p.log.LoggingEnabled {
ftype := "idle"
if flush == thresholdFlush {
ftype = "threshold"
} else if flush == memoryFlush {
ftype = "memory"
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
}
p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id)
}
p.statMap.Add(ftypeStat, 1)
p.statMap.Add(statPointsFlushed, int64(pointCount))
p.statMap.Add(statSeriesFlushed, int64(len(c.seriesToFlush)))

startTime := time.Now()
// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}

writeDuration := time.Since(startTime)
p.statMap.AddFloat(statFlushDuration, writeDuration.Seconds())
if p.log.LoggingEnabled {
p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime))
p.log.logger.Printf("write to index of partition %d took %s\n", p.id, writeDuration)
}

// clear the flush cache and reset the memory thresholds
p.mu.Lock()
p.flushCache = nil
p.memorySize -= uint64(c.flushSize)
p.mu.Unlock()
p.statMap.Add(statMemorySize, -int64(c.flushSize))

// ensure that we mark that compaction is no longer running
defer func() {
Expand Down Expand Up @@ -1137,6 +1191,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
// Generate in-memory cache entry of <timestamp,data>.
v := MarshalEntry(timestamp, data)
p.memorySize += uint64(len(v))
p.statMap.Add(statMemorySize, int64(len(v)))
keystr := string(key)

entry := p.cache[keystr]
Expand Down

0 comments on commit 6d028f9

Please sign in to comment.