Skip to content

Commit

Permalink
make the api more controlable
Browse files Browse the repository at this point in the history
  • Loading branch information
kmulvey committed Sep 9, 2022
1 parent c33e72f commit ee4606d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cmd/nsquared/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
log.Infof("Found %d dirs", len(files))

log.Info("Started, go to grafana to monitor")
id.Start(fileNames)
var errors = id.Run(fileNames)

// wait for all diff workers to finish or we get a shutdown signal
// whichever comes first
Expand All @@ -92,7 +92,7 @@ func main() {
select {
case <-gracefulShutdown:
graceful = false
case <-id.Wait():
case <-errors:
workers = false
}
}
Expand Down
18 changes: 10 additions & 8 deletions internal/app/imagedup/hash/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Differ struct {
errors chan error
cache *Cache
deleteLogger *logrus.Logger
numWorkers int
distanceThreshold int
diffTime prometheus.Gauge
comparisonsCompleted prometheus.Gauge
Expand All @@ -37,6 +38,7 @@ func NewDiffer(ctx context.Context, numWorkers, distanceThreshold int, workChan
cache: cache,
deleteLogger: deleteLogger,
distanceThreshold: distanceThreshold,
numWorkers: numWorkers,
diffTime: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: promNamespace,
Expand All @@ -52,22 +54,22 @@ func NewDiffer(ctx context.Context, numWorkers, distanceThreshold int, workChan
prometheus.MustRegister(dp.diffTime)
prometheus.MustRegister(dp.comparisonsCompleted)

var errorChans = make([]chan error, numWorkers)
for i := 0; i < numWorkers; i++ {
return dp
}

func (dp *Differ) Run() chan error {
var errorChans = make([]chan error, dp.numWorkers)
for i := 0; i < dp.numWorkers; i++ {
var errors = make(chan error)
errorChans[i] = errors
go dp.run(errors)
go dp.diffWorker(errors)
}

dp.errors = goutils.MergeChannels(errorChans...)
return dp
}

func (dp *Differ) Wait() chan error {
return dp.errors
}

func (dp *Differ) run(errors chan error) {
func (dp *Differ) diffWorker(errors chan error) {

// declare these here to reduce allocations in the loop
var start time.Time
Expand Down
8 changes: 3 additions & 5 deletions internal/app/imagedup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ func NewImageDup(ctx context.Context, promNamespace, hashCacheFile, deleteLogFil
return id, nil
}

func (id *ImageDup) Start(files []string) {
func (id *ImageDup) Run(files []string) chan error {
var errors = id.Differ.Run()
id.streamFiles(files)
return errors
}

func (id *ImageDup) Shutdown(cacheFile string) error {
return id.Cache.Persist(cacheFile)
}

func (id *ImageDup) Errors() error {
return <-id.Differ.Wait()
}

0 comments on commit ee4606d

Please sign in to comment.