Skip to content

Commit

Permalink
Merge pull request #9335 from influxdata/jl-race
Browse files Browse the repository at this point in the history
Prevent race condition caused by WaitGroup re-use
  • Loading branch information
joelegasse authored Jan 29, 2018
2 parents 079fe6e + 21a5823 commit 9f75729
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- [#9290](https://github.com/influxdata/influxdb/issues/9290): Fix regression to allow binary operations on literals.
- [#9342](https://github.com/influxdata/influxdb/pull/9342): Fix data races in tcp.Mux and tcp.listener
- [#9353](https://github.com/influxdata/influxdb/pull/9353): Fix panic in msgpack httpd WriteResponse error handler.
- [#9335](https://github.com/influxdata/influxdb/pull/9335): Prevent race condition caused by WaitGroup re-use

## v1.4.3 [unreleased]

Expand Down
90 changes: 47 additions & 43 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ type Engine struct {
// decrease 'levelWorkers', and when it decreases to zero, level compactions will be started
// back up again.

wg sync.WaitGroup // waitgroup for active level compaction goroutines
done chan struct{} // channel to signal level compactions to stop
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state
wg *sync.WaitGroup // waitgroup for active level compaction goroutines
done chan struct{} // channel to signal level compactions to stop
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state

snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG sync.WaitGroup // waitgroup for running snapshot compactions
snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG *sync.WaitGroup // waitgroup for running snapshot compactions

id uint64
database string
Expand Down Expand Up @@ -346,11 +346,12 @@ func (e *Engine) enableLevelCompactions(wait bool) {
// last one to enable, start things back up
e.Compactor.EnableCompactions()
e.done = make(chan struct{})

e.wg.Add(1)
wg := new(sync.WaitGroup)
wg.Add(1)
e.wg = wg
e.mu.Unlock()

go func() { defer e.wg.Done(); e.compact() }()
go func() { defer wg.Done(); e.compact(wg) }()
}

// disableLevelCompactions will stop level compactions before returning.
Expand All @@ -366,6 +367,7 @@ func (e *Engine) disableLevelCompactions(wait bool) {

// Hold onto the current done channel so we can wait on it if necessary
waitCh := e.done
wg := e.wg

if old == 0 && e.done != nil {
// It's possible we have closed the done channel and released the lock and another
Expand All @@ -384,7 +386,7 @@ func (e *Engine) disableLevelCompactions(wait bool) {
// Stop all background compaction goroutines
close(e.done)
e.mu.Unlock()
e.wg.Wait()
wg.Wait()

// Signal that all goroutines have exited.
e.mu.Lock()
Expand All @@ -402,7 +404,7 @@ func (e *Engine) disableLevelCompactions(wait bool) {
// We were not the first caller to disable compactions and they were in the process
// of being disabled. Wait for them to complete before returning.
<-waitCh
e.wg.Wait()
wg.Wait()
}

func (e *Engine) enableSnapshotCompactions() {
Expand All @@ -423,36 +425,38 @@ func (e *Engine) enableSnapshotCompactions() {

e.Compactor.EnableSnapshots()
e.snapDone = make(chan struct{})
e.snapWG.Add(1)
wg := new(sync.WaitGroup)
wg.Add(1)
e.snapWG = wg
e.mu.Unlock()

go func() { defer e.snapWG.Done(); e.compactCache() }()
go func() { defer wg.Done(); e.compactCache() }()
}

func (e *Engine) disableSnapshotCompactions() {
e.mu.Lock()
var wait bool
if e.snapDone != nil {
// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}

close(e.snapDone)
e.Compactor.DisableSnapshots()
wait = true
if e.snapDone == nil {
e.mu.Unlock()
return
}

// We may be in the process of stopping snapshots. See if the channel
// was closed.
select {
case <-e.snapDone:
e.mu.Unlock()
return
default:
}

// first one here, disable and wait for completion
close(e.snapDone)
e.Compactor.DisableSnapshots()
wg := e.snapWG
e.mu.Unlock()

// Wait for the snapshot goroutine to exit.
if wait {
e.snapWG.Wait()
}
wg.Wait()

// Signal that the goroutines are exit and everything is stopped by setting
// snapDone to nil.
Expand Down Expand Up @@ -1690,7 +1694,7 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration
}

func (e *Engine) compact() {
func (e *Engine) compact(wg *sync.WaitGroup) {
t := time.NewTicker(time.Second)
defer t.Stop()

Expand Down Expand Up @@ -1757,19 +1761,19 @@ func (e *Engine) compact() {

switch level {
case 1:
if e.compactHiPriorityLevel(level1Groups[0], 1, false) {
if e.compactHiPriorityLevel(level1Groups[0], 1, false, wg) {
level1Groups = level1Groups[1:]
}
case 2:
if e.compactHiPriorityLevel(level2Groups[0], 2, false) {
if e.compactHiPriorityLevel(level2Groups[0], 2, false, wg) {
level2Groups = level2Groups[1:]
}
case 3:
if e.compactLoPriorityLevel(level3Groups[0], 3, true) {
if e.compactLoPriorityLevel(level3Groups[0], 3, true, wg) {
level3Groups = level3Groups[1:]
}
case 4:
if e.compactFull(level4Groups[0]) {
if e.compactFull(level4Groups[0], wg) {
level4Groups = level4Groups[1:]
}
}
Expand All @@ -1786,7 +1790,7 @@ func (e *Engine) compact() {

// compactHiPriorityLevel kicks off compactions using the high priority policy. It returns
// true if the compaction was started
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
Expand All @@ -1796,9 +1800,9 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)

e.wg.Add(1)
wg.Add(1)
go func() {
defer e.wg.Done()
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)

defer e.compactionLimiter.Release()
Expand All @@ -1815,7 +1819,7 @@ func (e *Engine) compactHiPriorityLevel(grp CompactionGroup, level int, fast boo

// compactLoPriorityLevel kicks off compactions using the lo priority policy. It returns
// the plans that were not able to be started
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool) bool {
func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast bool, wg *sync.WaitGroup) bool {
s := e.levelCompactionStrategy(grp, fast, level)
if s == nil {
return false
Expand All @@ -1824,9 +1828,9 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], 1)
e.wg.Add(1)
wg.Add(1)
go func() {
defer e.wg.Done()
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMCompactionsActive[level-1], -1)
defer e.compactionLimiter.Release()
s.Apply()
Expand All @@ -1840,7 +1844,7 @@ func (e *Engine) compactLoPriorityLevel(grp CompactionGroup, level int, fast boo

// compactFull kicks off full and optimize compactions using the lo priority policy. It returns
// the plans that were not able to be started.
func (e *Engine) compactFull(grp CompactionGroup) bool {
func (e *Engine) compactFull(grp CompactionGroup, wg *sync.WaitGroup) bool {
s := e.fullCompactionStrategy(grp, false)
if s == nil {
return false
Expand All @@ -1849,9 +1853,9 @@ func (e *Engine) compactFull(grp CompactionGroup) bool {
// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
atomic.AddInt64(&e.stats.TSMFullCompactionsActive, 1)
e.wg.Add(1)
wg.Add(1)
go func() {
defer e.wg.Done()
defer wg.Done()
defer atomic.AddInt64(&e.stats.TSMFullCompactionsActive, -1)
defer e.compactionLimiter.Release()
s.Apply()
Expand Down

0 comments on commit 9f75729

Please sign in to comment.