diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index a65993915b..0081c8c0b5 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -130,16 +130,12 @@ type mirrorJob struct { // channel for errors errorCh chan *probe.Error - // Contains if watcher is currently running. - watcherRunning bool - // the global watcher object, which receives notifications of created // and deleted files watcher *Watcher // Hold operation status information - status Status - scanBar scanBarFunc + status Status // waitgroup for status goroutine, waits till all status // messages have been written and received @@ -149,11 +145,6 @@ type mirrorJob struct { // channel for status messages statusCh chan URLs - // Store last watch error - watchErr *probe.Error - // Store last mirror error - mirrorErr *probe.Error - TotalObjects int64 TotalBytes int64 @@ -252,8 +243,6 @@ func (mj *mirrorJob) startStatus() { for sURLs := range mj.statusCh { if sURLs.Error != nil { - // Save last mirror error - mj.mirrorErr = sURLs.Error // Print in new line and adjust to top so that we // don't print over the ongoing progress bar. if sURLs.SourceContent != nil { @@ -284,12 +273,12 @@ func (mj *mirrorJob) stopStatus() { } // this goroutine will watch for notifications, and add modified objects to the queue -func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.CancelFunc) { - +func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.CancelFunc, errCh chan<- *probe.Error) { for { select { case event, ok := <-mj.watcher.Events(): if !ok { + errCh <- nil // channel closed return } @@ -407,15 +396,17 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance } case err := <-mj.watcher.Errors(): - mj.watchErr = err switch err.ToGoError().(type) { case APINotImplemented: - // Ignore error if API is not implemented. - mj.watcherRunning = false - return - default: - errorIf(err, "Unexpected error during monitoring.") + errorIf(err.Trace(), "Unable to Watch on source, ignoring.") + // Watch is perhaps not implemented, log and ignore the error. + err = nil } + errCh <- err + return + case <-mj.trapCh: + errCh <- nil + return } } } @@ -433,7 +424,7 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error { } // Fetch urls that need to be mirrored -func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc) { +func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc, errCh chan<- *probe.Error) { var totalBytes int64 var totalObjects int64 @@ -445,8 +436,10 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance select { case sURLs, ok := <-URLsCh: if !ok { - // finished harvesting urls - goto exit + close(queueCh) + parallel.wait() + errCh <- nil + return } if sURLs.Error != nil { if strings.Contains(sURLs.Error.ToGoError().Error(), " is a folder.") { @@ -482,40 +475,42 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance } case <-mj.trapCh: cancelMirror() - os.Exit(0) + errCh <- nil + return } } - -exit: - close(queueCh) - parallel.wait() } // when using a struct for copying, we could save a lot of passing of variables -func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc) { - if globalQuiet || globalJSON { - } else { - // Enable progress bar reader only during default mode - mj.scanBar = scanBarFactory() - } - +func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc) *probe.Error { // start the status go routine mj.startStatus() - // Starts additional watcher thread for watching for new events. + watchErrCh := make(chan *probe.Error) + mirrorErrCh := make(chan *probe.Error) + + // Starts watcher loop for watching for new events. if mj.isWatch { - go mj.watchMirror(ctx, cancelMirror) + go mj.watchMirror(ctx, cancelMirror, watchErrCh) } // Start mirroring. - mj.startMirror(ctx, cancelMirror) + go mj.startMirror(ctx, cancelMirror, mirrorErrCh) + + if err := <-mirrorErrCh; err != nil { + return err.Trace() + } if mj.isWatch { - <-mj.trapCh + if err := <-watchErrCh; err != nil { + return err.Trace() + } } // Wait until all progress bar updates are actually shown and quit. mj.stopStatus() + + return nil } func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch bool, excludeOptions []string) *mirrorJob { @@ -541,13 +536,11 @@ func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch isWatch: isWatch, excludeOptions: excludeOptions, - status: status, - scanBar: func(s string) {}, - statusCh: make(chan URLs), - queueCh: make(chan func() URLs), - wgStatus: new(sync.WaitGroup), - watcherRunning: true, - watcher: NewWatcher(UTCNow()), + status: status, + statusCh: make(chan URLs), + queueCh: make(chan func() URLs), + wgStatus: new(sync.WaitGroup), + watcher: NewWatcher(UTCNow()), } return &mj @@ -628,17 +621,13 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context) *probe.Error { if d.Diff == differInFirst { // Bucket only exists in the source, create the same bucket in the destination - err := newDstClt.MakeBucket(ctx.String("region"), false) - if err != nil { - mj.mirrorErr = err + if err := newDstClt.MakeBucket(ctx.String("region"), false); err != nil { errorIf(err, "Cannot created bucket in `"+newTgtURL+"`") continue } // Copy policy rules from source to dest if flag is activated if ctx.Bool("a") { - err := copyBucketPolicies(srcClt, dstClt, isOverwrite) - if err != nil { - mj.mirrorErr = err + if err := copyBucketPolicies(srcClt, dstClt, isOverwrite); err != nil { errorIf(err, "Cannot copy bucket policies to `"+newDstClt.GetURL().String()+"`") } } @@ -661,20 +650,12 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context) *probe.Error { mj.status.fatalIf(err, fmt.Sprintf("Failed to start monitoring.")) } } + ctxt, cancelMirror := context.WithCancel(context.Background()) defer cancelMirror() - // Start mirroring job - mj.mirror(ctxt, cancelMirror) - // Check for errors during mirroring or watching to return - if mj.mirrorErr != nil { - return mj.mirrorErr - } - if mj.watchErr != nil { - return mj.watchErr - } - - return nil + // Start mirroring job + return mj.mirror(ctxt, cancelMirror) } // Main entry point for mirror command. @@ -692,6 +673,7 @@ func mainMirror(ctx *cli.Context) error { tgtURL := args[1] if err := runMirror(srcURL, tgtURL, ctx); err != nil { + errorIf(err.Trace(srcURL, tgtURL), "Unable to mirror") return exitStatus(globalErrorExitStatus) } diff --git a/cmd/parallel-manager.go b/cmd/parallel-manager.go index 4cf622724b..cb732fc5da 100644 --- a/cmd/parallel-manager.go +++ b/cmd/parallel-manager.go @@ -73,8 +73,8 @@ func (p *ParallelManager) addWorker() { go func() { for { // Wait for jobs - fn := <-p.queueCh - if fn == nil { + fn, ok := <-p.queueCh + if !ok { // No more tasks, quit p.wg.Done() return