Skip to content

Commit

Permalink
Merge pull request #51 from ipfs/fix/query-goroutine-leak
Browse files Browse the repository at this point in the history
fix a goroutine leak killing the gateways
  • Loading branch information
Stebalien authored Dec 18, 2018
2 parents bbeee3c + 3c828de commit dc81160
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ sudo: false
language: go

go:
- 1.9.x
- 1.11.x

install:
- make deps
Expand Down
43 changes: 30 additions & 13 deletions flatfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/jbenet/goprocess"

logging "github.com/ipfs/go-log"
)
Expand Down Expand Up @@ -631,18 +632,25 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) {
return nil, errors.New("flatfs only supports listing all keys in random order")
}

reschan := make(chan query.Result, query.KeysOnlyBufSize)
go func() {
defer close(reschan)
err := fs.walkTopLevel(fs.path, reschan)
if err != nil {
reschan <- query.Result{Error: errors.New("walk failed: " + err.Error())}
// Replicates the logic in ResultsWithChan but actually respects calls
// to `Close`.
b := query.NewResultBuilder(q)
b.Process.Go(func(p goprocess.Process) {
err := fs.walkTopLevel(fs.path, b)
if err == nil {
return
}
}()
return query.ResultsWithChan(q, reschan), nil
select {
case b.Output <- query.Result{Error: errors.New("walk failed: " + err.Error())}:
case <-p.Closing():
}
})
go b.Process.CloseAfterChildren()

return b.Results(), nil
}

func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error {
func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path)
if err != nil {
return err
Expand All @@ -653,16 +661,21 @@ func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error
return err
}
for _, dir := range names {

if len(dir) == 0 || dir[0] == '.' {
continue
}

err = fs.walk(filepath.Join(path, dir), reschan)
err = fs.walk(filepath.Join(path, dir), result)
if err != nil {
return err
}

// Are we closing?
select {
case <-result.Process.Closing():
return nil
default:
}
}
return nil
}
Expand Down Expand Up @@ -957,7 +970,7 @@ func (fs *Datastore) Accuracy() string {
return string(fs.storedValue.Accuracy)
}

func (fs *Datastore) walk(path string, reschan chan query.Result) error {
func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
dir, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -993,10 +1006,14 @@ func (fs *Datastore) walk(path string, reschan chan query.Result) error {
continue
}

reschan <- query.Result{
select {
case result.Output <- query.Result{
Entry: query.Entry{
Key: key.String(),
},
}:
case <-result.Process.Closing():
return nil
}
}
return nil
Expand Down
31 changes: 31 additions & 0 deletions flatfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,3 +988,34 @@ func BenchmarkBatchedPut(b *testing.B) {
}
b.StopTimer() // avoid counting cleanup
}

func TestQueryLeak(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()

fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
defer fs.Close()

for i := 0; i < 1000; i++ {
err = fs.Put(datastore.NewKey(fmt.Sprint(i)), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
}

before := runtime.NumGoroutine()
for i := 0; i < 200; i++ {
res, err := fs.Query(query.Query{KeysOnly: true})
if err != nil {
t.Errorf("Query fail: %v\n", err)
}
res.Close()
}
after := runtime.NumGoroutine()
if after-before > 100 {
t.Errorf("leaked %d goroutines", after-before)
}
}
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
"hash": "QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D",
"name": "go-datastore",
"version": "3.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP",
"name": "goprocess",
"version": "1.0.0"
}
],
"gxVersion": "0.8.0",
Expand Down

0 comments on commit dc81160

Please sign in to comment.