Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mirror with --watch should copy and exit for s3 endpoints #2345

Merged
merged 1 commit into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 45 additions & 63 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,12 @@ type mirrorJob struct {
// channel for errors
errorCh chan *probe.Error

// Contains if watcher is currently running.
watcherRunning bool

// the global watcher object, which receives notifications of created
// and deleted files
watcher *Watcher

// Hold operation status information
status Status
scanBar scanBarFunc
status Status

// waitgroup for status goroutine, waits till all status
// messages have been written and received
Expand All @@ -149,11 +145,6 @@ type mirrorJob struct {
// channel for status messages
statusCh chan URLs

// Store last watch error
watchErr *probe.Error
// Store last mirror error
mirrorErr *probe.Error

TotalObjects int64
TotalBytes int64

Expand Down Expand Up @@ -252,8 +243,6 @@ func (mj *mirrorJob) startStatus() {

for sURLs := range mj.statusCh {
if sURLs.Error != nil {
// Save last mirror error
mj.mirrorErr = sURLs.Error
// Print in new line and adjust to top so that we
// don't print over the ongoing progress bar.
if sURLs.SourceContent != nil {
Expand Down Expand Up @@ -284,12 +273,12 @@ func (mj *mirrorJob) stopStatus() {
}

// this goroutine will watch for notifications, and add modified objects to the queue
func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.CancelFunc) {

func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.CancelFunc, errCh chan<- *probe.Error) {
for {
select {
case event, ok := <-mj.watcher.Events():
if !ok {
errCh <- nil
// channel closed
return
}
Expand Down Expand Up @@ -407,15 +396,17 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance
}

case err := <-mj.watcher.Errors():
mj.watchErr = err
switch err.ToGoError().(type) {
case APINotImplemented:
// Ignore error if API is not implemented.
mj.watcherRunning = false
return
default:
errorIf(err, "Unexpected error during monitoring.")
errorIf(err.Trace(), "Unable to Watch on source, ignoring.")
// Watch is perhaps not implemented, log and ignore the error.
err = nil
}
errCh <- err
return
case <-mj.trapCh:
errCh <- nil
return
}
}
}
Expand All @@ -433,7 +424,7 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error {
}

// Fetch urls that need to be mirrored
func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc) {
func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc, errCh chan<- *probe.Error) {
var totalBytes int64
var totalObjects int64

Expand All @@ -445,8 +436,10 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance
select {
case sURLs, ok := <-URLsCh:
if !ok {
// finished harvesting urls
goto exit
close(queueCh)
parallel.wait()
errCh <- nil
return
}
if sURLs.Error != nil {
if strings.Contains(sURLs.Error.ToGoError().Error(), " is a folder.") {
Expand Down Expand Up @@ -482,40 +475,42 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance
}
case <-mj.trapCh:
cancelMirror()
os.Exit(0)
errCh <- nil
return
}
}

exit:
close(queueCh)
parallel.wait()
}

// when using a struct for copying, we could save a lot of passing of variables
func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc) {
if globalQuiet || globalJSON {
} else {
// Enable progress bar reader only during default mode
mj.scanBar = scanBarFactory()
}

func (mj *mirrorJob) mirror(ctx context.Context, cancelMirror context.CancelFunc) *probe.Error {
// start the status go routine
mj.startStatus()

// Starts additional watcher thread for watching for new events.
watchErrCh := make(chan *probe.Error)
mirrorErrCh := make(chan *probe.Error)

// Starts watcher loop for watching for new events.
if mj.isWatch {
go mj.watchMirror(ctx, cancelMirror)
go mj.watchMirror(ctx, cancelMirror, watchErrCh)
}

// Start mirroring.
mj.startMirror(ctx, cancelMirror)
go mj.startMirror(ctx, cancelMirror, mirrorErrCh)

if err := <-mirrorErrCh; err != nil {
return err.Trace()
}

if mj.isWatch {
<-mj.trapCh
if err := <-watchErrCh; err != nil {
return err.Trace()
}
}

// Wait until all progress bar updates are actually shown and quit.
mj.stopStatus()

return nil
}

func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch bool, excludeOptions []string) *mirrorJob {
Expand All @@ -541,13 +536,11 @@ func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch
isWatch: isWatch,
excludeOptions: excludeOptions,

status: status,
scanBar: func(s string) {},
statusCh: make(chan URLs),
queueCh: make(chan func() URLs),
wgStatus: new(sync.WaitGroup),
watcherRunning: true,
watcher: NewWatcher(UTCNow()),
status: status,
statusCh: make(chan URLs),
queueCh: make(chan func() URLs),
wgStatus: new(sync.WaitGroup),
watcher: NewWatcher(UTCNow()),
}

return &mj
Expand Down Expand Up @@ -628,17 +621,13 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context) *probe.Error {

if d.Diff == differInFirst {
// Bucket only exists in the source, create the same bucket in the destination
err := newDstClt.MakeBucket(ctx.String("region"), false)
if err != nil {
mj.mirrorErr = err
if err := newDstClt.MakeBucket(ctx.String("region"), false); err != nil {
errorIf(err, "Cannot created bucket in `"+newTgtURL+"`")
continue
}
// Copy policy rules from source to dest if flag is activated
if ctx.Bool("a") {
err := copyBucketPolicies(srcClt, dstClt, isOverwrite)
if err != nil {
mj.mirrorErr = err
if err := copyBucketPolicies(srcClt, dstClt, isOverwrite); err != nil {
errorIf(err, "Cannot copy bucket policies to `"+newDstClt.GetURL().String()+"`")
}
}
Expand All @@ -661,20 +650,12 @@ func runMirror(srcURL, dstURL string, ctx *cli.Context) *probe.Error {
mj.status.fatalIf(err, fmt.Sprintf("Failed to start monitoring."))
}
}

ctxt, cancelMirror := context.WithCancel(context.Background())
defer cancelMirror()
// Start mirroring job
mj.mirror(ctxt, cancelMirror)

// Check for errors during mirroring or watching to return
if mj.mirrorErr != nil {
return mj.mirrorErr
}
if mj.watchErr != nil {
return mj.watchErr
}

return nil
// Start mirroring job
return mj.mirror(ctxt, cancelMirror)
}

// Main entry point for mirror command.
Expand All @@ -692,6 +673,7 @@ func mainMirror(ctx *cli.Context) error {
tgtURL := args[1]

if err := runMirror(srcURL, tgtURL, ctx); err != nil {
errorIf(err.Trace(srcURL, tgtURL), "Unable to mirror")
return exitStatus(globalErrorExitStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/parallel-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (p *ParallelManager) addWorker() {
go func() {
for {
// Wait for jobs
fn := <-p.queueCh
if fn == nil {
fn, ok := <-p.queueCh
if !ok {
// No more tasks, quit
p.wg.Done()
return
Expand Down