Skip to content

Commit

Permalink
Have pelican plugin move files in parallel
Browse files Browse the repository at this point in the history
Now the plugin can move files just like handle_http does, starts up many
workers and has a producer/consumer setup to download files in parallel.
  • Loading branch information
joereuss12 committed Jan 23, 2024
1 parent cb64873 commit ea45067
Showing 1 changed file with 89 additions and 49 deletions.
138 changes: 89 additions & 49 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -158,8 +159,6 @@ func stashPluginMain(args []string) {

var source []string
var dest string
var result error
//var downloaded int64 = 0
var transfers []Transfer

if len(args) == 0 && (infile == "" || outfile == "") {
Expand Down Expand Up @@ -209,10 +208,95 @@ func stashPluginMain(args []string) {
defer outputFile.Close()
}

var wg sync.WaitGroup

workChan := make(chan Transfer) // might need to be a Transfer object
results := make(chan *classads.ClassAd, len(transfers))

// start workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go moveObjects(source, methods, upload, &wg, workChan, results)
}

for _, transfer := range transfers {
workChan <- transfer
}
close(workChan)

// Wait for transfers
wg.Wait()

var resultAds []*classads.ClassAd
// Every transfer should send a resultAd to the results channel
for i := 0; i < len(transfers); i++ {
select {
case resultAd := <-results:
resultAds = append(resultAds, resultAd)
default:
// If we did not get a result, something terrible happened
log.Errorln(errors.New("Failed to get resultAds from transfer"))
}
}

success := true
retryable := false
for _, transfer := range transfers {
for _, resultAd := range resultAds {
_, err := outputFile.WriteString(resultAd.String() + "\n")
if err != nil {
log.Errorln("Failed to write to outfile:", err)
os.Exit(1)
}
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to get TransferSuccess:", err)
success = false
}
success = success && transferSuccess.(bool)
// If we do not get a success, check if it is retryable
if !success {
retryableTransfer, err := resultAd.Get("TransferRetryable")
if err != nil {
log.Errorln("Failed to see if ad is retryable", err)
}
retryable = retryableTransfer.(bool)
}
}

if err = outputFile.Sync(); err != nil {
var perr *fs.PathError
var serr syscall.Errno
// Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error
// Error code EINVAL is returned on Linux
// Error code ENODEV is returned on Mac OS X
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) {
log.Debugf("Error when syncing: %s; can be ignored\n", perr)
} else {
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) {
log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr))
} else {
log.Errorln("Failed to sync output file:", err)
}
os.Exit(1)
}
}

if success {
os.Exit(0)
} else if retryable {
os.Exit(11)
} else {
os.Exit(1)
}
}

// moveObjects performs the appropriate download or upload functions for the plugin as well as
// writes the resultAds for each transfer
// Returns: resultAds and if an error given is retryable
func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGroup, workChan <-chan Transfer, results chan<- *classads.ClassAd) {
defer wg.Done()
var result error
for transfer := range workChan { //instead range workChan
var tmpDownloaded int64
if upload {
source = append(source, transfer.localFile)
Expand Down Expand Up @@ -266,61 +350,17 @@ func stashPluginMain(args []string) {
}
errMsg += transfer.url + ": " + client.GetErrors()
resultAd.Set("TransferError", errMsg)
client.ClearErrors()
}
resultAd.Set("TransferFileBytes", 0)
resultAd.Set("TransferTotalBytes", 0)
if client.ErrorsRetryable() {
resultAd.Set("TransferRetryable", true)
retryable = true
} else {
resultAd.Set("TransferRetryable", false)
retryable = false

}
}
resultAds = append(resultAds, resultAd)

}

success := true
for _, resultAd := range resultAds {
_, err := outputFile.WriteString(resultAd.String() + "\n")
if err != nil {
log.Errorln("Failed to write to outfile:", err)
os.Exit(1)
}
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to get TransferSuccess:", err)
success = false
}
success = success && transferSuccess.(bool)
}
if err = outputFile.Sync(); err != nil {
var perr *fs.PathError
var serr syscall.Errno
// Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error
// Error code EINVAL is returned on Linux
// Error code ENODEV is returned on Mac OS X
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) && (int(serr) == 1 || serr == syscall.EINVAL || serr == syscall.ENODEV) {
log.Debugf("Error when syncing: %s; can be ignored\n", perr)
} else {
if errors.As(err, &perr) && errors.As(perr.Unwrap(), &serr) {
log.Errorf("Failed to sync output file: %s (errno %d)", serr, int(serr))
} else {
log.Errorln("Failed to sync output file:", err)
}
os.Exit(1)
client.ClearErrors()
}
}

if success {
os.Exit(0)
} else if retryable {
os.Exit(11)
} else {
os.Exit(1)
results <- resultAd
}
}

Expand Down

0 comments on commit ea45067

Please sign in to comment.