Skip to content

Commit

Permalink
add WAL lock to prevent timing lock contention
Browse files Browse the repository at this point in the history
This commit adds a lock to the WAL log to prevent timing how long
it takes to obtain the Bolt write lock.
  • Loading branch information
benbjohnson committed Sep 1, 2015
1 parent 9067664 commit d52fe89
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3886](https://github.com/influxdb/influxdb/pull/3886): Prevent write timeouts due to lock contention in WAL
- [#3574](https://github.com/influxdb/influxdb/issues/3574): Querying data node causes panic
- [#3913](https://github.com/influxdb/influxdb/issues/3913): Convert meta shard owners to objects
- [#3927](https://github.com/influxdb/influxdb/issues/3927): Add WAL lock to prevent timing lock contention

## v0.9.3 [2015-08-26]

Expand Down
55 changes: 33 additions & 22 deletions tsdb/engine/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Log struct {
path string

flush chan int // signals a background flush on the given partition
flushLock sync.Mutex // serializes access to flushing to index
flushCheckTimer *time.Timer // check this often to see if a background flush should happen
flushCheckInterval time.Duration

Expand Down Expand Up @@ -675,6 +676,10 @@ func (l *Log) flushMetadata() error {
}
}

// Lock before flushing to ensure timing is not spent waiting for lock.
l.flushLock.Lock()
defer l.flushLock.Unlock()

startTime := time.Now()
if l.EnableLogging {
l.logger.Printf("Flushing %d measurements and %d series to index\n", len(measurements), len(series))
Expand Down Expand Up @@ -1064,29 +1069,35 @@ func (p *Partition) flushAndCompact(flush flushType) error {
return nil
}

startTime := time.Now()
if p.log.EnableLogging {
ftype := "idle"
if flush == thresholdFlush {
ftype = "threshold"
} else if flush == memoryFlush {
ftype = "memory"
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
// Lock before flushing to ensure timing is not spent waiting for lock.
func() {
p.log.flushLock.Lock()
defer p.log.flushLock.Unlock()

startTime := time.Now()
if p.log.EnableLogging {
ftype := "idle"
if flush == thresholdFlush {
ftype = "threshold"
} else if flush == memoryFlush {
ftype = "memory"
}
pointCount := 0
for _, a := range c.seriesToFlush {
pointCount += len(a)
}
p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting)
}
p.log.logger.Printf("Flush due to %s. Flushing %d series with %d points and %d bytes from partition %d. Compacting %d series\n", ftype, len(c.seriesToFlush), pointCount, c.flushSize, p.id, c.countCompacting)
}

// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}
if p.log.EnableLogging {
p.log.logger.Printf("write to index took of partition %d took %s\n", p.id, time.Since(startTime))
}
// write the data to the index first
if err := p.index.WriteIndex(c.seriesToFlush, nil, nil); err != nil {
// if we can't write the index, we should just bring down the server hard
panic(fmt.Sprintf("error writing the wal to the index: %s", err.Error()))
}
if p.log.EnableLogging {
p.log.logger.Printf("write to index of partition %d took %s\n", p.id, time.Since(startTime))
}
}()

// clear the flush cache and reset the memory thresholds
p.mu.Lock()
Expand All @@ -1101,7 +1112,7 @@ func (p *Partition) flushAndCompact(flush flushType) error {
p.mu.Unlock()
}()

startTime = time.Now()
startTime := time.Now()
err = p.compactFiles(c, flush)
if p.log.EnableLogging {
p.log.logger.Printf("compaction of partition %d took %s\n", p.id, time.Since(startTime))
Expand Down

0 comments on commit d52fe89

Please sign in to comment.