Skip to content

Commit

Permalink
remove ctx from structs, pass it to func
Browse files Browse the repository at this point in the history
  • Loading branch information
kmulvey committed Sep 9, 2022
1 parent 748e435 commit db60394
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 24 deletions.
4 changes: 2 additions & 2 deletions cmd/nsquared/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
}

// start er up
var id, err = imagedup.NewImageDup(ctx, "imagedup", cacheFile, outputFile, threads, distanceThreshold)
var id, err = imagedup.NewImageDup("imagedup", cacheFile, outputFile, threads, distanceThreshold)
handleErr("NewImageDup", err)

// list all the files
Expand All @@ -83,7 +83,7 @@ func main() {
log.Infof("Found %d dirs", len(files))

log.Info("Started, go to grafana to monitor")
var errors = id.Run(fileNames)
var errors = id.Run(ctx, fileNames)

// wait for all diff workers to finish or we get a shutdown signal
// whichever comes first
Expand Down
6 changes: 4 additions & 2 deletions internal/app/imagedup/files.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package imagedup

import (
"context"

"github.com/kmulvey/imagedup/pkg/types"
)

func (id *ImageDup) streamFiles(files []string) {
func (id *ImageDup) streamFiles(ctx context.Context, files []string) {
var dedup = make(map[string]struct{})
for i, one := range files {
for j, two := range files {
if i != j {
// this protects us from getting nil exception when shutting down
select {
case <-id.Context.Done():
case <-ctx.Done():
close(id.images)
return
default:
Expand Down
10 changes: 5 additions & 5 deletions internal/app/imagedup/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var expectedPairs = map[string]struct{}{
func TestStreamFiles(t *testing.T) {
t.Parallel()

var id, err = NewImageDup(context.Background(), "TestStreamFiles", "cacheFile.json", "outputFile.log", 2, 10)
var id, err = NewImageDup("TestStreamFiles", "cacheFile.json", "outputFile.log", 2, 10)
assert.NoError(t, err)

var done = make(chan struct{})
Expand All @@ -35,16 +35,15 @@ func TestStreamFiles(t *testing.T) {
files = path.OnlyFiles(files)
var fileNames = path.OnlyNames(files)

id.streamFiles(fileNames)
id.streamFiles(context.Background(), fileNames)

<-done
}

func TestStreamFilesCancel(t *testing.T) {
t.Parallel()

var ctx, cancel = context.WithCancel(context.Background())
var id, err = NewImageDup(ctx, "TestStreamFilesCancel", "cacheFile.json", "outputFile.log", 2, 10)
var id, err = NewImageDup("TestStreamFilesCancel", "cacheFile.json", "outputFile.log", 2, 10)
assert.NoError(t, err)

var done = make(chan struct{})
Expand All @@ -58,7 +57,8 @@ func TestStreamFilesCancel(t *testing.T) {
close(done)
}()

go id.streamFiles(make([]string, 100))
var ctx, cancel = context.WithCancel(context.Background())
go id.streamFiles(ctx, make([]string, 100))
cancel()

<-done
Expand Down
14 changes: 6 additions & 8 deletions internal/app/imagedup/hash/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
)

type Differ struct {
ctx context.Context
workChan chan types.Pair
errors chan error
cache *Cache
Expand All @@ -25,15 +24,14 @@ type Differ struct {
comparisonsCompleted prometheus.Gauge
}

func NewDiffer(ctx context.Context, numWorkers, distanceThreshold int, workChan chan types.Pair, cache *Cache, deleteLogger *logrus.Logger, promNamespace string) *Differ {
func NewDiffer(numWorkers, distanceThreshold int, inputImages chan types.Pair, cache *Cache, deleteLogger *logrus.Logger, promNamespace string) *Differ {

if numWorkers <= 0 || numWorkers > runtime.GOMAXPROCS(0)-1 {
numWorkers = 1
}

var dp = &Differ{
ctx: ctx,
workChan: workChan,
workChan: inputImages,
errors: make(chan error),
cache: cache,
deleteLogger: deleteLogger,
Expand All @@ -57,19 +55,19 @@ func NewDiffer(ctx context.Context, numWorkers, distanceThreshold int, workChan
return dp
}

func (dp *Differ) Run() chan error {
func (dp *Differ) Run(ctx context.Context) chan error {
var errorChans = make([]chan error, dp.numWorkers)
for i := 0; i < dp.numWorkers; i++ {
var errors = make(chan error)
errorChans[i] = errors
go dp.diffWorker(errors)
go dp.diffWorker(ctx, errors)
}

dp.errors = goutils.MergeChannels(errorChans...)
return dp.errors
}

func (dp *Differ) diffWorker(errors chan error) {
func (dp *Differ) diffWorker(ctx context.Context, errors chan error) {

// declare these here to reduce allocations in the loop
var start time.Time
Expand All @@ -79,7 +77,7 @@ func (dp *Differ) diffWorker(errors chan error) {

for {
select {
case <-dp.ctx.Done():
case <-ctx.Done():
close(errors)
return
default:
Expand Down
12 changes: 5 additions & 7 deletions internal/app/imagedup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ import (
)

type ImageDup struct {
context.Context
*stats
*hash.Cache
deleteLogger *logrus.Logger
*hash.Differ
images chan types.Pair
}

func NewImageDup(ctx context.Context, promNamespace, hashCacheFile, deleteLogFile string, numWorkers, distanceThreshold int) (*ImageDup, error) {
func NewImageDup(promNamespace, hashCacheFile, deleteLogFile string, numWorkers, distanceThreshold int) (*ImageDup, error) {
var id = new(ImageDup)
var err error

id.Context = ctx
id.images = make(chan types.Pair)
id.stats = newStats(promNamespace)

Expand All @@ -35,16 +33,16 @@ func NewImageDup(ctx context.Context, promNamespace, hashCacheFile, deleteLogFil
return nil, err
}

id.Differ = hash.NewDiffer(ctx, numWorkers, distanceThreshold, id.images, id.Cache, id.deleteLogger, promNamespace)
id.Differ = hash.NewDiffer(numWorkers, distanceThreshold, id.images, id.Cache, id.deleteLogger, promNamespace)

go id.stats.publishStats(id.Cache)

return id, nil
}

func (id *ImageDup) Run(files []string) chan error {
var errors = id.Differ.Run()
id.streamFiles(files)
func (id *ImageDup) Run(ctx context.Context, files []string) chan error {
var errors = id.Differ.Run(ctx)
id.streamFiles(ctx, files)
return errors
}

Expand Down

0 comments on commit db60394

Please sign in to comment.