diff --git a/CHANGELOG.md b/CHANGELOG.md index 351e32537b4..b420cb622f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ With this release InfluxDB is moving to Go 1.5. - [#4043](https://github.com/influxdb/influxdb/pull/4043): Add stats and batching to openTSDB input - [#4042](https://github.com/influxdb/influxdb/pull/4042): Add pending batches control to batcher - [#4006](https://github.com/influxdb/influxdb/pull/4006): Add basic statistics for shards +- [#4072](https://github.com/influxdb/influxdb/pull/4072): Add statistics for the WAL. ### Bugfixes - [#4042](https://github.com/influxdb/influxdb/pull/4042): Set UDP input batching defaults as needed. diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index f5eb5e6d4e4..fb3556f6cad 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -25,6 +25,7 @@ import ( "encoding/binary" "encoding/json" "errors" + "expvar" "fmt" "io" "log" @@ -37,6 +38,7 @@ import ( "time" "github.com/golang/snappy" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/tsdb" ) @@ -67,6 +69,23 @@ const ( defaultFlushCheckInterval = time.Second ) +// Statistics maintained by the WAL +const ( + statPointsWriteReq = "points_write_req" + statPointsWrite = "points_write" + statFlush = "flush" + statAutoFlush = "auto_flush" + statIdleFlush = "idle_flush" + statMetadataFlush = "meta_flush" + statThresholdFlush = "threshold_flush" + statMemoryFlush = "mem_flush" + statSeriesFlushed = "series_flush" + statPointsFlushed = "points_flush" + statFlushDuration = "flush_duration" + statWriteFail = "write_fail" + statMemorySize = "mem_size" +) + // flushType indiciates why a flush and compaction are being run so the partition can // do the appropriate type of compaction type flushType int @@ -149,6 +168,9 @@ type Log struct { // LoggingEnabled specifies if detailed logs should be output LoggingEnabled bool + + // expvar-based statistics + statMap *expvar.Map } // IndexWriter is an interface for the indexed database the WAL flushes data to @@ -160,6 +182,11 @@ type IndexWriter interface { } func NewLog(path string) *Log { + // Configure expvar monitoring. It's OK to do this even if the service fails to open and + // should be done before any data could arrive for the service. + key := strings.Join([]string{"wal", path}, ":") + tags := map[string]string{"bind": path} + return &Log{ path: path, flush: make(chan int, 1), @@ -174,6 +201,7 @@ func NewLog(path string) *Log { ReadySeriesSize: tsdb.DefaultReadySeriesSize, flushCheckInterval: defaultFlushCheckInterval, logger: log.New(os.Stderr, "[wal] ", log.LstdFlags), + statMap: influxdb.NewStatistics(key, "wal", tags), } } @@ -234,6 +262,9 @@ func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { } func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { + l.statMap.Add(statPointsWriteReq, 1) + l.statMap.Add(statPointsWrite, int64(len(points))) + // persist the series and fields if there are any if err := l.writeSeriesAndFields(fields, series); err != nil { l.logger.Println("error writing series and fields:", err.Error()) @@ -246,6 +277,7 @@ func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.Measureme // Flush will force a flush on all paritions func (l *Log) Flush() error { + l.statMap.Add(statFlush, 1) l.mu.RLock() defer l.mu.RUnlock() @@ -549,6 +581,7 @@ func (l *Log) close() error { // triggerAutoFlush will flush and compact any partitions that have hit the thresholds for compaction func (l *Log) triggerAutoFlush() { + l.statMap.Add(statAutoFlush, 1) l.mu.RLock() defer l.mu.RUnlock() @@ -591,6 +624,8 @@ func (l *Log) autoflusher(closing chan struct{}) { // metadata from previous files to the index. After a sucessful write, the metadata files // will be removed. While the flush to index is happening we aren't blocked for new metadata writes. func (l *Log) flushMetadata() error { + l.statMap.Add(statMetadataFlush, 1) + // make sure there's actually something in the metadata file to flush size, err := func() (int64, error) { l.mu.Lock() @@ -699,7 +734,8 @@ type Partition struct { flushColdInterval time.Duration lastWriteTime time.Time - log *Log + log *Log + statMap *expvar.Map // Used for mocking OS calls os struct { @@ -718,6 +754,11 @@ type Partition struct { const partitionBufLen = 16 << 10 // 16kb func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) { + // Configure expvar monitoring. It's OK to do this even if the service fails to open and + // should be done before any data could arrive for the service. + key := strings.Join([]string{"partition", strconv.Itoa(int(id)), path}, ":") + tags := map[string]string{"partition": path, "id": strconv.Itoa(int(id))} + p := &Partition{ id: id, path: path, @@ -728,6 +769,7 @@ func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64 readySeriesSize: readySeriesSize, index: index, flushColdInterval: flushColdInterval, + statMap: influxdb.NewStatistics(key, "partition", tags), } p.os.OpenCompactionFile = os.OpenFile @@ -779,6 +821,7 @@ func (p *Partition) Write(points []tsdb.Point) error { }(); shouldCompact { go p.flushAndCompact(memoryFlush) } else if shouldFailWrite { + p.statMap.Add(statWriteFail, 1) return fmt.Errorf("write throughput too high. backoff and retry") } p.mu.Lock() @@ -963,19 +1006,26 @@ func (p *Partition) flushAndCompact(flush flushType) error { return nil } + // Logging and stats. + ftype := "idle" + ftypeStat := statIdleFlush + if flush == thresholdFlush { + ftype = "threshold" + ftypeStat = statThresholdFlush + } else if flush == memoryFlush { + ftype = "memory" + ftypeStat = statMemoryFlush + } + pointCount := 0 + for _, a := range c.seriesToFlush { + pointCount += len(a) + } if p.log.LoggingEnabled { - ftype := "idle" - if flush == thresholdFlush { - ftype = "threshold" - } else if flush == memoryFlush { - ftype = "memory" - } - pointCount := 0 - for _, a := range c.seriesToFlush { - pointCount += len(a) - } p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id) } + p.statMap.Add(ftypeStat, 1) + p.statMap.Add(statPointsFlushed, int64(pointCount)) + p.statMap.Add(statSeriesFlushed, int64(len(c.seriesToFlush))) startTime := time.Now() // write the data to the index first @@ -983,8 +1033,11 @@ func (p *Partition) flushAndCompact(flush flushType) error { // if we can't write the index, we should just bring down the server hard panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error())) } + + writeDuration := time.Since(startTime) + p.statMap.AddFloat(statFlushDuration, writeDuration.Seconds()) if p.log.LoggingEnabled { - p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime)) + p.log.logger.Printf("write to index of partition %d took %s\n", p.id, writeDuration) } // clear the flush cache and reset the memory thresholds @@ -992,6 +1045,7 @@ func (p *Partition) flushAndCompact(flush flushType) error { p.flushCache = nil p.memorySize -= uint64(c.flushSize) p.mu.Unlock() + p.statMap.Add(statMemorySize, -int64(c.flushSize)) // ensure that we mark that compaction is no longer running defer func() { @@ -1137,6 +1191,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) { // Generate in-memory cache entry of . v := MarshalEntry(timestamp, data) p.memorySize += uint64(len(v)) + p.statMap.Add(statMemorySize, int64(len(v))) keystr := string(key) entry := p.cache[keystr]