diff --git a/CHANGELOG.md b/CHANGELOG.md index 36d9b002f81..189241967c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3886](https://github.com/influxdb/influxdb/pull/3886): Prevent write timeouts due to lock contention in WAL - [#3574](https://github.com/influxdb/influxdb/issues/3574): Querying data node causes panic - [#3913](https://github.com/influxdb/influxdb/issues/3913): Convert meta shard owners to objects +- [#3927](https://github.com/influxdb/influxdb/issues/3927): Add WAL lock to prevent timing lock contention ## v0.9.3 [2015-08-26] diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index b3627592072..e316424c4a0 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -102,6 +102,7 @@ type Log struct { path string flush chan int // signals a background flush on the given partition + flushLock sync.Mutex // serializes access to flushing to index flushCheckTimer *time.Timer // check this often to see if a background flush should happen flushCheckInterval time.Duration @@ -675,6 +676,10 @@ func (l *Log) flushMetadata() error { } } + // Lock before flushing to ensure timing is not spent waiting for lock. + l.flushLock.Lock() + defer l.flushLock.Unlock() + startTime := time.Now() if l.EnableLogging { l.logger.Printf("Flushing %d measurements and %d series to index\n", len(measurements), len(series)) @@ -1064,29 +1069,35 @@ func (p *Partition) flushAndCompact(flush flushType) error { return nil } - startTime := time.Now() - if p.log.EnableLogging { - ftype := "idle" - if flush == thresholdFlush { - ftype = "threshold" - } else if flush == memoryFlush { - ftype = "memory" - } - pointCount := 0 - for _, a := range c.seriesToFlush { - pointCount += len(a) + // Lock before flushing to ensure timing is not spent waiting for lock. + func() { + p.log.flushLock.Lock() + defer p.log.flushLock.Unlock() + + startTime := time.Now() + if p.log.EnableLogging { + ftype := "idle" + if flush == thresholdFlush { + ftype = "threshold" + } else if flush == memoryFlush { + ftype = "memory" + } + pointCount := 0 + for _, a := range c.seriesToFlush { + pointCount += len(a) + } + p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting) } - p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting) - } - // write the data to the index first - if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil { - // if we can't write the index, we should just bring down the server hard - panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error())) - } - if p.log.EnableLogging { - p.log.logger.Printf("write to index took of partition %d took %s\n", p.id, time.Since(startTime)) - } + // write the data to the index first + if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil { + // if we can't write the index, we should just bring down the server hard + panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error())) + } + if p.log.EnableLogging { + p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime)) + } + }() // clear the flush cache and reset the memory thresholds p.mu.Lock() @@ -1101,7 +1112,7 @@ func (p *Partition) flushAndCompact(flush flushType) error { p.mu.Unlock() }() - startTime = time.Now() + startTime := time.Now() err = p.compactFiles(c, flush) if p.log.EnableLogging { p.log.logger.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime))