Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent deadlock during path reading / results populating #14

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 69 additions & 89 deletions impi.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package impi

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand All @@ -9,14 +10,12 @@ import (
"strings"

"github.com/kisielk/gotool"
"golang.org/x/sync/errgroup"
)

// Impi is a single instance that can perform verification on a path
type Impi struct {
numWorkers int
resultChan chan interface{}
filePathsChan chan string
stopChan chan bool
verifyOptions *VerifyOptions
SkipPathRegexes []*regexp.Regexp
}
Expand All @@ -25,7 +24,6 @@ type Impi struct {
type ImportGroupVerificationScheme int

const (

// ImportGroupVerificationSchemeSingle allows for a single, sorted group
ImportGroupVerificationSchemeSingle = ImportGroupVerificationScheme(iota)

Expand Down Expand Up @@ -70,10 +68,7 @@ type ErrorReporter interface {
// NewImpi creates a new impi instance
func NewImpi(numWorkers int) (*Impi, error) {
newImpi := &Impi{
numWorkers: numWorkers,
resultChan: make(chan interface{}, 1024),
filePathsChan: make(chan string),
stopChan: make(chan bool),
numWorkers: numWorkers,
}

return newImpi, nil
Expand All @@ -82,7 +77,6 @@ func NewImpi(numWorkers int) (*Impi, error) {
// Verify will iterate over the path and start verifying import correctness within
// all .go files in the path. Path follows go tool semantics (e.g. ./...)
func (i *Impi) Verify(rootPath string, verifyOptions *VerifyOptions, errorReporter ErrorReporter) error {

// save stuff for current session
i.verifyOptions = verifyOptions

Expand All @@ -96,25 +90,45 @@ func (i *Impi) Verify(rootPath string, verifyOptions *VerifyOptions, errorReport
i.SkipPathRegexes = append(i.SkipPathRegexes, skipPathRegex)
}

// spin up the workers do handle all the data in the channel. workers will die
if err := i.createWorkers(i.numWorkers); err != nil {
return err
}
numErrors := 0
resultsCh := make(chan VerificationError)
filePathsCh := make(chan string, i.numWorkers)

// populate paths channel from path. paths channel will contain .go source file paths
if err := i.populatePathsChan(rootPath); err != nil {
g, ctx := errgroup.WithContext(context.TODO())
g.Go(func() error {
for res := range resultsCh {
errorReporter.Report(res)
numErrors++
}
return nil
})
g.Go(func() error {
defer close(filePathsCh)
// When the populate paths function finishes up (error or not), filePathsCh will be closed. This will
// allow the workers goroutine to finish up, as all iterations over this channel will stop.
return i.populatePathsChan(ctx, rootPath, filePathsCh)
})
g.Go(func() error {
defer close(resultsCh)
// If all workers fail, the results reading goroutine will safely stop as resultsCh becomes closed. The
// file path populating goroutine will end up trying to write to filePathsCh whilst nothing is reading
// from it; deadlock is prevented here because errgroup will cancel the context that is passed down.
// resultsCh is always going to be read to completion (there is no error cases in the results reading
// goroutine), so there is no possibility of deadlock when trying to write to this channel.
return i.createWorkers(filePathsCh, resultsCh)
})
if err := g.Wait(); err != nil {
return err
}

// wait for worker completion. if an error was reported, return error
if numErrors := i.waitWorkerCompletion(errorReporter); numErrors != 0 {
if numErrors != 0 {
return fmt.Errorf("Found %d errors", numErrors)
}

return nil
}

func (i *Impi) populatePathsChan(rootPath string) error {
func (i *Impi) populatePathsChan(ctx context.Context, rootPath string, filePathsCh chan<- string) error {
// TODO: this should be done in parallel

// get all the packages in the root path, following go 1.9 semantics
Expand All @@ -141,85 +155,47 @@ func (i *Impi) populatePathsChan(rootPath string) error {
continue
}

i.addFilePathToFilePathsChan(path.Join(packagePath, fileInfo.Name()))
if err := i.addFilePathToFilePathsChan(ctx, path.Join(packagePath, fileInfo.Name()), filePathsCh); err != nil {
return err
}
}

} else {

// shove path to channel if passes filter
i.addFilePathToFilePathsChan(packagePath)
}
}

// close the channel to signify we won't add any more data
close(i.filePathsChan)

return nil
}

func (i *Impi) waitWorkerCompletion(errorReporter ErrorReporter) int {
numWorkersComplete := 0
numErrorsReported := 0

for result := range i.resultChan {
switch typedResult := result.(type) {
case VerificationError:
errorReporter.Report(typedResult)
numErrorsReported++
case bool:
numWorkersComplete++
}

// if we're done, break the loop
if numWorkersComplete == i.numWorkers {
break
if err := i.addFilePathToFilePathsChan(ctx, packagePath, filePathsCh); err != nil {
return err
}
}
}

return numErrorsReported
}

func (i *Impi) createWorkers(numWorkers int) error {
for workerIndex := 0; workerIndex < numWorkers; workerIndex++ {
go i.verifyPathsFromChan()
}

return nil
}

func (i *Impi) verifyPathsFromChan() error {

// create a verifier with which we'll verify modules
verifier, err := newVerifier()
if err != nil {
return err
}

// while we're not done
for filePath := range i.filePathsChan {
func (i *Impi) createWorkers(filePathsCh <-chan string, resultsCh chan<- VerificationError) error {
var g errgroup.Group
for idx := 0; idx < i.numWorkers; idx++ {
g.Go(func() error {
// create a verifier with which we'll verify modules
verifier, err := newVerifier()
if err != nil {
return err
}

// open the file
file, err := os.Open(filePath)
if err != nil {
return err
}
for filePath := range filePathsCh {
f, err := os.Open(filePath)
if err != nil {
return err
}

// verify the path and report an error if one is found
if err = verifier.verify(file, i.verifyOptions); err != nil {
verificationError := VerificationError{
error: err,
FilePath: filePath,
// verify the path and report an error if one is found
if err = verifier.verify(f, i.verifyOptions); err != nil {
resultsCh <- VerificationError{error: err, FilePath: filePath}
}
}

// write to results channel
i.resultChan <- verificationError
}
return nil
})
}

// a boolean in the result chan signifies that we're done
i.resultChan <- true

return nil
return g.Wait()
}

func isDir(path string) bool {
Expand All @@ -231,25 +207,29 @@ func isDir(path string) bool {
return info.IsDir()
}

func (i *Impi) addFilePathToFilePathsChan(filePath string) {

func (i *Impi) addFilePathToFilePathsChan(ctx context.Context, filePath string, filePathsCh chan<- string) error {
// skip non-go files
if !strings.HasSuffix(filePath, ".go") {
return
return nil
}

// skip tests if not desired
if strings.HasSuffix(filePath, "_test.go") && i.verifyOptions.SkipTests {
return
return nil
}

// cmd/impi/main.go should check the patters
for _, skipPathRegex := range i.SkipPathRegexes {
if skipPathRegex.Match([]byte(filePath)) {
return
return nil
}
}

// write to paths chan
i.filePathsChan <- filePath
select {
case <-ctx.Done():
return ctx.Err()
case filePathsCh <- filePath:
return nil
}
}