Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible deadlock #5963

Merged
merged 4 commits into from
Mar 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## v0.12.0 [unreleased]

### Features

### Bugfixes

- [#5963](https://github.com/influxdata/influxdb/pull/5963): Fix possible deadlock

## v0.11.0 [unreleased]

### Features
Expand Down
3 changes: 0 additions & 3 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Engine interface {
DeleteMeasurement(name string, seriesKeys []string) error
SeriesCount() (n int, err error)

// PerformMaintenance will get called periodically by the store
PerformMaintenance()

// Format will return the format for the engine
Format() EngineFormat

Expand Down
4 changes: 0 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
// Path returns the path the engine was opened with.
func (e *Engine) Path() string { return e.path }

// PerformMaintenance is for periodic maintenance of the store. A no-op for b1
func (e *Engine) PerformMaintenance() {
}

// Index returns the database index.
func (e *Engine) Index() *tsdb.DatabaseIndex {
e.mu.Lock()
Expand Down
7 changes: 5 additions & 2 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,11 @@ func (d *DatabaseIndex) measurementsByTagFilters(filters []*TagFilter) Measureme
return measurements
}

// measurementsByRegex returns the measurements that match the regex.
func (d *DatabaseIndex) measurementsByRegex(re *regexp.Regexp) Measurements {
// MeasurementsByRegex returns the measurements that match the regex.
func (d *DatabaseIndex) MeasurementsByRegex(re *regexp.Regexp) Measurements {
d.mu.RLock()
defer d.mu.RUnlock()

var matches Measurements
for _, m := range d.measurements {
if re.MatchString(m.Name) {
Expand Down
6 changes: 0 additions & 6 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
// Path returns the path set on the shard when it was created.
func (s *Shard) Path() string { return s.path }

// PerformMaintenance gets called periodically to have the engine perform
// any maintenance tasks like WAL flushing and compaction
func (s *Shard) PerformMaintenance() {
s.engine.PerformMaintenance()
}

// Open initializes and opens the shard's store.
func (s *Shard) Open() error {
if err := func() error {
Expand Down
42 changes: 3 additions & 39 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (s *Store) Open() error {
return err
}

go s.periodicMaintenance()
s.opened = true

return nil
Expand Down Expand Up @@ -567,46 +566,11 @@ func (s *Store) deleteSeries(database string, seriesKeys []string) error {
return nil
}

// periodicMaintenance is the method called in a goroutine on the opening of the store
// to perform periodic maintenance of the shards.
func (s *Store) periodicMaintenance() {
t := time.NewTicker(maintenanceCheckInterval)
for {
select {
case <-t.C:
s.performMaintenance()
case <-s.closing:
t.Stop()
return
}
}
}

// performMaintenance loops through shards and executes any maintenance
// tasks. Those tasks should run in their own goroutines if they will
// take significant time.
func (s *Store) performMaintenance() {
s.mu.Lock()
defer s.mu.Unlock()
for _, sh := range s.shards {
s.performMaintenanceOnShard(sh)
}
}

func (s *Store) performMaintenanceOnShard(shard *Shard) {
defer func() {
if r := recover(); r != nil {
s.Logger.Printf("recovered error in maintenance on shard %d", shard.id)
}
}()
shard.PerformMaintenance()
}

// ExpandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (s *Store) ExpandSources(sources influxql.Sources) (influxql.Sources, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.RLock()
defer s.mu.RUnlock()
return s.expandSources(sources)
}

Expand All @@ -631,7 +595,7 @@ func (s *Store) expandSources(sources influxql.Sources) (influxql.Sources, error
}

// Loop over matching measurements.
for _, m := range db.measurementsByRegex(src.Regex.Val) {
for _, m := range db.MeasurementsByRegex(src.Regex.Val) {
other := &influxql.Measurement{
Database: src.Database,
RetentionPolicy: src.RetentionPolicy,
Expand Down