Skip to content

Commit

Permalink
Merge pull request #3195 from hashicorp/issue-3018-filehandle-leaks
Browse files Browse the repository at this point in the history
Fix socket file handle leaks from old blocking queries upon consul reload
  • Loading branch information
preetapan authored Jun 27, 2017
2 parents 07db760 + d25be86 commit 48983bd
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
13 changes: 11 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ type QueryOptions struct {
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8

// Context (optional) is passed through to the underlying http request layer, can be used
// to set timeouts and deadlines as well as to cancel requests
Context context.Context
}

// WriteOptions are used to parameterize a write
Expand Down Expand Up @@ -457,6 +461,7 @@ type request struct {
body io.Reader
header http.Header
obj interface{}
ctx context.Context
}

// setQueryOptions is used to annotate the request with
Expand Down Expand Up @@ -494,6 +499,7 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.RelayFactor != 0 {
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
}
r.ctx = q.Context
}

// durToMsec converts a duration to a millisecond specified string. If the
Expand Down Expand Up @@ -569,8 +575,11 @@ func (r *request) toHTTP() (*http.Request, error) {
if r.config.HttpAuth != nil {
req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password)
}

return req, nil
if r.ctx != nil {
return req.WithContext(r.ctx), nil
} else {
return req, nil
}
}

// newRequest is used to create a new request
Expand Down
29 changes: 22 additions & 7 deletions watch/funcs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watch

import (
"context"
"fmt"

consulapi "github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -41,7 +42,8 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
}
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -70,7 +72,8 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
}
fn := func(p *Plan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
pairs, meta, err := kv.List(prefix, &opts)
if err != nil {
return 0, nil, err
Expand All @@ -89,7 +92,8 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
Expand All @@ -108,7 +112,8 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -144,7 +149,8 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
if err != nil {
return 0, nil, err
Expand Down Expand Up @@ -177,7 +183,8 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, stale)
defer p.cancelFunc()
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
Expand Down Expand Up @@ -205,7 +212,8 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {

fn := func(p *Plan) (uint64, interface{}, error) {
event := p.client.Event()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
opts := makeQueryOptionsWithContext(p, false)
defer p.cancelFunc()
events, meta, err := event.List(name, &opts)
if err != nil {
return 0, nil, err
Expand All @@ -222,3 +230,10 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
}
return fn, nil
}

func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions {
ctx, cancel := context.WithCancel(context.Background())
p.cancelFunc = cancel
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex, Context: ctx}
return opts
}
3 changes: 3 additions & 0 deletions watch/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ func (p *Plan) Stop() {
return
}
p.stop = true
if p.cancelFunc != nil {
p.cancelFunc()
}
close(p.stopCh)
}

Expand Down
8 changes: 5 additions & 3 deletions watch/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watch

import (
"context"
"fmt"
"io"
"sync"
Expand All @@ -27,9 +28,10 @@ type Plan struct {
lastIndex uint64
lastResult interface{}

stop bool
stopCh chan struct{}
stopLock sync.Mutex
stop bool
stopCh chan struct{}
stopLock sync.Mutex
cancelFunc context.CancelFunc
}

// WatcherFunc is used to watch for a diff
Expand Down

0 comments on commit 48983bd

Please sign in to comment.