diff --git a/cmd/plugin.go b/cmd/plugin.go index d2c606389..6b6a28e6a 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "strings" + "sync" "syscall" "time" @@ -158,8 +159,6 @@ func stashPluginMain(args []string) { var source []string var dest string - var result error - //var downloaded int64 = 0 var transfers []Transfer if len(args) == 0 && (infile == "" || outfile == "") { @@ -209,10 +208,95 @@ func stashPluginMain(args []string) { defer outputFile.Close() } + var wg sync.WaitGroup + + workChan := make(chan Transfer) // might need to be a Transfer object + results := make(chan *classads.ClassAd, len(transfers)) + + // start workers + for i := 1; i <= 5; i++ { + wg.Add(1) + go moveObjects(source, methods, upload, &wg, workChan, results) + } + + for _, transfer := range transfers { + workChan <- transfer + } + close(workChan) + + // Wait for transfers + wg.Wait() + var resultAds []*classads.ClassAd + // Every transfer should send a resultAd to the results channel + for i := 0; i < len(transfers); i++ { + select { + case resultAd := <-results: + resultAds = append(resultAds, resultAd) + default: + // If we did not get a result, something terrible happened + log.Errorln(errors.New("Failed to get resultAds from transfer")) + } + } + + success := true retryable := false - for _, transfer := range transfers { + for _, resultAd := range resultAds { + _, err := outputFile.WriteString(resultAd.String() + "\n") + if err != nil { + log.Errorln("Failed to write to outfile:", err) + os.Exit(1) + } + transferSuccess, err := resultAd.Get("TransferSuccess") + if err != nil { + log.Errorln("Failed to get TransferSuccess:", err) + success = false + } + success = success && transferSuccess.(bool) + // If we do not get a success, check if it is retryable + if !success { + retryableTransfer, err := resultAd.Get("TransferRetryable") + if err != nil { + log.Errorln("Failed to see if ad is retryable", err) + } + retryable = retryableTransfer.(bool) + } + } + + if err = outputFile.Sync(); err != nil { + var perr *fs.PathError + var serr syscall.Errno + // Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error + // Error code EINVAL is returned on Linux + // Error code ENODEV is returned on Mac OS X + if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) { + log.Debugf("Error when syncing: %s; can be ignored\n", perr) + } else { + if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) { + log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr)) + } else { + log.Errorln("Failed to sync output file:", err) + } + os.Exit(1) + } + } + + if success { + os.Exit(0) + } else if retryable { + os.Exit(11) + } else { + os.Exit(1) + } +} +// moveObjects performs the appropriate download or upload functions for the plugin as well as +// writes the resultAds for each transfer +// Returns: resultAds and if an error given is retryable +func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGroup, workChan <-chan Transfer, results chan<- *classads.ClassAd) { + defer wg.Done() + var result error + for transfer := range workChan { //instead range workChan var tmpDownloaded int64 if upload { source = append(source, transfer.localFile) @@ -266,61 +350,17 @@ func stashPluginMain(args []string) { } errMsg += transfer.url + ": " + client.GetErrors() resultAd.Set("TransferError", errMsg) - client.ClearErrors() } resultAd.Set("TransferFileBytes", 0) resultAd.Set("TransferTotalBytes", 0) if client.ErrorsRetryable() { resultAd.Set("TransferRetryable", true) - retryable = true } else { resultAd.Set("TransferRetryable", false) - retryable = false - - } - } - resultAds = append(resultAds, resultAd) - - } - - success := true - for _, resultAd := range resultAds { - _, err := outputFile.WriteString(resultAd.String() + "\n") - if err != nil { - log.Errorln("Failed to write to outfile:", err) - os.Exit(1) - } - transferSuccess, err := resultAd.Get("TransferSuccess") - if err != nil { - log.Errorln("Failed to get TransferSuccess:", err) - success = false - } - success = success && transferSuccess.(bool) - } - if err = outputFile.Sync(); err != nil { - var perr *fs.PathError - var serr syscall.Errno - // Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error - // Error code EINVAL is returned on Linux - // Error code ENODEV is returned on Mac OS X - if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) { - log.Debugf("Error when syncing: %s; can be ignored\n", perr) - } else { - if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) { - log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr)) - } else { - log.Errorln("Failed to sync output file:", err) } - os.Exit(1) + client.ClearErrors() } - } - - if success { - os.Exit(0) - } else if retryable { - os.Exit(11) - } else { - os.Exit(1) + results <- resultAd } }