Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1098 from bbockelm/multi_plugin_test
Browse files Browse the repository at this point in the history
Fix work deadlock
  • Loading branch information
haoming29 authored Apr 12, 2024
2 parents cb4b262 + 94a087c commit 5c31b60
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 13 deletions.
33 changes: 25 additions & 8 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (te *TransferEngine) newPelicanURL(remoteUrl *url.URL) (pelicanURL pelicanU
func NewTransferEngine(ctx context.Context) *TransferEngine {
ctx, cancel := context.WithCancel(ctx)
egrp, _ := errgroup.WithContext(ctx)
work := make(chan *clientTransferJob)
work := make(chan *clientTransferJob, 5)
files := make(chan *clientTransferFile)
results := make(chan *clientTransferResults, 5)
suppressedLoader := ttlcache.NewSuppressedLoader(loader, new(singleflight.Group))
Expand All @@ -542,6 +542,7 @@ func NewTransferEngine(ctx context.Context) *TransferEngine {
)

// Start our cache for url metadata
// This is stopped in the `Shutdown` method
go pelicanURLCache.Start()

te := &TransferEngine{
Expand All @@ -553,7 +554,7 @@ func NewTransferEngine(ctx context.Context) *TransferEngine {
results: results,
resultsMap: make(map[uuid.UUID]chan *TransferResults),
workMap: make(map[uuid.UUID]chan *TransferJob),
jobLookupDone: make(chan *clientTransferJob),
jobLookupDone: make(chan *clientTransferJob, 5),
notifyChan: make(chan bool),
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
Expand Down Expand Up @@ -692,6 +693,7 @@ func (te *TransferEngine) Close() {
func (te *TransferEngine) runMux() error {
tmpResults := make(map[uuid.UUID][]*TransferResults)
activeJobs := make(map[uuid.UUID][]*TransferJob)
var clientJob *clientTransferJob
closing := false
closedWorkChan := false
// The main body of the routine; continuously select on one of the channels,
Expand All @@ -716,11 +718,17 @@ func (te *TransferEngine) runMux() error {
sortFunc := func(a, b uuid.UUID) int {
return bytes.Compare(a[:], b[:])
}
// Only listen for more incoming work if we're not waiting to send a client job to the
// jobs-to-objects worker
slices.SortFunc(workKeys, sortFunc)
for _, key := range workKeys {
workMap[key] = te.workMap[key]
cases[ctr].Dir = reflect.SelectRecv
cases[ctr].Chan = reflect.ValueOf(workMap[key])
if clientJob == nil {
cases[ctr].Chan = reflect.ValueOf(workMap[key])
} else {
cases[ctr].Chan = reflect.ValueOf(nil)
}
ctr++
}
resultsMap = make(map[uuid.UUID]chan *TransferResults, len(tmpResults))
Expand All @@ -742,9 +750,15 @@ func (te *TransferEngine) runMux() error {
// Notification that a transfer has finished.
cases[ctr+1].Dir = reflect.SelectRecv
cases[ctr+1].Chan = reflect.ValueOf(te.results)
// Placeholder; never used.
cases[ctr+2].Dir = reflect.SelectRecv
cases[ctr+2].Chan = reflect.ValueOf(nil)
// Buffer the jobs to send to the job-to-objects worker.
if clientJob == nil {
cases[ctr+2].Dir = reflect.SelectRecv
cases[ctr+2].Chan = reflect.ValueOf(nil)
} else {
cases[ctr+2].Dir = reflect.SelectSend
cases[ctr+2].Chan = reflect.ValueOf(te.work)
cases[ctr+2].Send = reflect.ValueOf(clientJob)
}
// Notification that the TransferEngine has been cancelled; shutdown immediately
cases[ctr+3].Dir = reflect.SelectRecv
cases[ctr+3].Chan = reflect.ValueOf(te.ctx.Done())
Expand Down Expand Up @@ -787,14 +801,13 @@ func (te *TransferEngine) runMux() error {
continue
}
job := recv.Interface().(*TransferJob)
clientJob := &clientTransferJob{job: job, uuid: id}
clientJob = &clientTransferJob{job: job, uuid: id}
clientJobs := activeJobs[id]
if clientJobs == nil {
clientJobs = make([]*TransferJob, 0)
}
clientJobs = append(clientJobs, job)
activeJobs[id] = clientJobs
te.work <- clientJob
} else if chosen < len(workMap)+len(resultsMap) {
// One of the "write" channels has been sent some results.
id := resultsKeys[chosen-len(workMap)]
Expand Down Expand Up @@ -859,6 +872,10 @@ func (te *TransferEngine) runMux() error {
}
tmpResults[result.id] = append(resultBuffer, &result.results)
}
} else if chosen == len(workMap)+len(resultsMap)+2 {
// Sent the buffered job to the job-to-objects worker. Clear
// out the buffer so that we can pull in more work.
clientJob = nil
} else if chosen == len(workMap)+len(resultsMap)+3 {
// Engine's context has been cancelled; immediately exit.
log.Debugln("Transfer engine has been cancelled")
Expand Down
2 changes: 1 addition & 1 deletion client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func getToken(destination *url.URL, namespace namespaces.Namespace, isWrite bool
err = errors.New("failed to find or generate a token as required for " + destination.String())
return "", err
} else {
log.Errorln("Credential is required, but currently mssing")
log.Errorln("Credential is required, but currently missing")
err := errors.New("Credential is required for " + destination.String() + " but is currently missing")
return "", err
}
Expand Down
6 changes: 6 additions & 0 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,17 @@ func readMultiTransfers(stdin bufio.Reader) (transfers []PluginTransfer, err err
if err != nil {
return nil, err
}
if adUrl == nil {
return nil, errors.New("Url attribute not set for transfer")
}

destination, err := ad.Get("LocalFileName")
if err != nil {
return nil, err
}
if destination == nil {
return nil, errors.New("LocalFileName attribute not set for transfer")
}
transfers = append(transfers, PluginTransfer{url: adUrl, localFile: destination.(string)})
}

Expand Down
69 changes: 69 additions & 0 deletions cmd/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,25 @@ import (
"encoding/json"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/classads"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/fed_test_utils"
"github.com/pelicanplatform/pelican/launchers"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
Expand Down Expand Up @@ -249,6 +253,71 @@ func TestStashPluginMain(t *testing.T) {
assert.Contains(t, output, amountDownloaded)
}

// Test multiple downloads from the plugin
func TestPluginMulti(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()

dirName := t.TempDir()

viper.Set("Logging.Level", "debug")
viper.Set("Origin.StorageType", "posix")
viper.Set("Origin.ExportVolumes", "/test")
viper.Set("Origin.EnablePublicReads", true)
fed := fed_test_utils.NewFedTest(t, "")
host := param.Server_Hostname.GetString() + ":" + strconv.Itoa(param.Server_WebPort.GetInt())

// Drop the testFileContent into the origin directory
destDir := filepath.Join(fed.Exports[0].StoragePrefix, "test")
require.NoError(t, os.MkdirAll(destDir, os.FileMode(0755)))
log.Debugln("Will create origin file at", destDir)
err := os.WriteFile(filepath.Join(destDir, "test.txt"), []byte("test file content"), fs.FileMode(0644))
require.NoError(t, err)
downloadUrl1 := url.URL{
Scheme: "pelican",
Host: host,
Path: "/test/test/test.txt",
}
localPath1 := filepath.Join(dirName, "test.txt")
err = os.WriteFile(filepath.Join(destDir, "test2.txt"), []byte("second test file content"), fs.FileMode(0644))
require.NoError(t, err)
downloadUrl2 := url.URL{
Scheme: "pelican",
Host: host,
Path: "/test/test/test2.txt",
}
localPath2 := filepath.Join(dirName, "test2.txt")

workChan := make(chan PluginTransfer, 2)
workChan <- PluginTransfer{url: &downloadUrl1, localFile: localPath1}
workChan <- PluginTransfer{url: &downloadUrl2, localFile: localPath2}
close(workChan)

results := make(chan *classads.ClassAd, 5)
fed.Egrp.Go(func() error {
return runPluginWorker(fed.Ctx, false, workChan, results)
})

done := false
for !done {
select {
case <-fed.Ctx.Done():
break
case resultAd, ok := <-results:
if !ok {
done = true
break
}
// Process results as soon as we get them
transferSuccess, err := resultAd.Get("TransferSuccess")
assert.NoError(t, err)
boolVal, ok := transferSuccess.(bool)
require.True(t, ok)
assert.True(t, boolVal)
}
}
}

func TestWriteOutfile(t *testing.T) {
t.Run("TestOutfileSuccess", func(t *testing.T) {
// Drop the testFileContent into the origin directory
Expand Down
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,12 @@ func GetPreferredPrefix() string {
arg0 := strings.ToUpper(filepath.Base(os.Args[0]))
underscore_idx := strings.Index(arg0, "_")
if underscore_idx != -1 {
return string(ConfigPrefix(arg0[0:underscore_idx]))
prefix := string(ConfigPrefix(arg0[0:underscore_idx]))
if prefix == "STASH" {
return "OSDF"
}
}
if strings.HasPrefix(arg0, "STASH") {
return "STASH"
} else if strings.HasPrefix(arg0, "OSDF") {
if strings.HasPrefix(arg0, "STASH") || strings.HasPrefix(arg0, "OSDF") {
return "OSDF"
}
return "PELICAN"
Expand Down

0 comments on commit 5c31b60

Please sign in to comment.