Skip to content

Commit

Permalink
Merge branch 'main' of github.com:haoming29/pelican into issue-159
Browse files Browse the repository at this point in the history
  • Loading branch information
haoming29 committed Jan 31, 2024
2 parents cf5def1 + 03a9925 commit a457402
Show file tree
Hide file tree
Showing 27 changed files with 1,414 additions and 133 deletions.
6 changes: 6 additions & 0 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,12 @@ func DownloadHTTP(transfer TransferDetails, dest string, token string) (int64, e
if err != nil {
return 0, err
}
if dest == "." {
dest, err = os.Getwd()
if err != nil {
return 0, errors.Wrap(err, "Failed to get current directory for destination")
}
}
unpacker = newAutoUnpacker(dest, behavior)
if req, err = grab.NewRequestToWriter(unpacker, transfer.Url.String()); err != nil {
return 0, errors.Wrap(err, "Failed to create new download request")
Expand Down
2 changes: 1 addition & 1 deletion cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func serveCacheInternal(cmdCtx context.Context) (context.CancelFunc, error) {
xrootd.LaunchXrootdMaintenance(ctx, cacheServer, 2*time.Minute)

log.Info("Launching cache")
launchers, err := xrootd.ConfigureLaunchers(false, configPath, false)
launchers, err := xrootd.ConfigureLaunchers(false, configPath, false, true)
if err != nil {
return shutdownCancel, err
}
Expand Down
115 changes: 96 additions & 19 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -158,8 +159,6 @@ func stashPluginMain(args []string) {

var source []string
var dest string
var result error
//var downloaded int64 = 0
var transfers []Transfer

if len(args) == 0 && (infile == "" || outfile == "") {
Expand Down Expand Up @@ -209,10 +208,84 @@ func stashPluginMain(args []string) {
defer outputFile.Close()
}

var wg sync.WaitGroup

workChan := make(chan Transfer, len(transfers))
results := make(chan *classads.ClassAd, len(transfers))

// Start workers
for i := 1; i <= 5; i++ {
wg.Add(1)
go moveObjects(source, methods, upload, &wg, workChan, results)
}

success := true
var resultAds []*classads.ClassAd
retryable := false
for _, transfer := range transfers {
counter := 0
done := false
for !done {
// Send to work channel the amount of transfers we have
if counter < len(transfers) {
workChan <- transfers[counter]
counter++
} else if counter == len(transfers) { // Once we sent all the work, close the channel
close(workChan)
// Increment counter so we no longer hit this case
counter++
}
select {
case resultAd := <-results:
// Process results as soon as we get them
transferSuccess, err := resultAd.Get("TransferSuccess")
if err != nil {
log.Errorln("Failed to get TransferSuccess:", err)
success = false
}
// If we are not uploading and we fail, we want to abort
if !upload && !transferSuccess.(bool) {
success = false
// Add the final (failed) result to the resultAds
resultAds = append(resultAds, resultAd)
done = true
break
} else { // Otherwise, we add to end result ads
resultAds = append(resultAds, resultAd)
}
default:
// We are either done or still downloading/uploading
if len(resultAds) == len(transfers) {
log.Debugln("Finished transfering objects! :)")
done = true
break
}
}
}

// Wait for transfers only if successful
if success {
wg.Wait()
}

close(results)

success, retryable := writeOutfile(resultAds, outputFile)

if success {
os.Exit(0)
} else if retryable {
os.Exit(11)
} else {
os.Exit(1)
}
}

// moveObjects performs the appropriate download or upload functions for the plugin as well as
// writes the resultAds for each transfer
// Returns: resultAds and if an error given is retryable
func moveObjects(source []string, methods []string, upload bool, wg *sync.WaitGroup, workChan <-chan Transfer, results chan<- *classads.ClassAd) {
defer wg.Done()
var result error
for transfer := range workChan {
var tmpDownloaded int64
if upload {
source = append(source, transfer.localFile)
Expand Down Expand Up @@ -266,24 +339,27 @@ func stashPluginMain(args []string) {
}
errMsg += transfer.url + ": " + client.GetErrors()
resultAd.Set("TransferError", errMsg)
client.ClearErrors()
}
resultAd.Set("TransferFileBytes", 0)
resultAd.Set("TransferTotalBytes", 0)
if client.ErrorsRetryable() {
resultAd.Set("TransferRetryable", true)
retryable = true
} else {
resultAd.Set("TransferRetryable", false)
retryable = false

}
client.ClearErrors()
}
resultAds = append(resultAds, resultAd)

results <- resultAd
}
}

// WriteOutfile takes in the result ads from the job and the file to be outputted, it returns a boolean indicating:
// true: all result ads indicate transfer success
// false: at least one result ad has failed
// As well as a boolean letting us know if errors are retryable
func writeOutfile(resultAds []*classads.ClassAd, outputFile *os.File) (bool, bool) {
success := true
retryable := false
for _, resultAd := range resultAds {
_, err := outputFile.WriteString(resultAd.String() + "\n")
if err != nil {
Expand All @@ -296,8 +372,16 @@ func stashPluginMain(args []string) {
success = false
}
success = success && transferSuccess.(bool)
// If we do not get a success, check if it is retryable
if !success {
retryableTransfer, err := resultAd.Get("TransferRetryable")
if err != nil {
log.Errorln("Failed to see if ad is retryable", err)
}
retryable = retryableTransfer.(bool)
}
}
if err = outputFile.Sync(); err != nil {
if err := outputFile.Sync(); err != nil {
var perr *fs.PathError
var serr syscall.Errno
// Error code 1 (serr) is ERROR_INVALID_FUNCTION, the expected Windows syscall error
Expand All @@ -314,14 +398,7 @@ func stashPluginMain(args []string) {
os.Exit(1)
}
}

if success {
os.Exit(0)
} else if retryable {
os.Exit(11)
} else {
os.Exit(1)
}
return success, retryable
}

// readMultiTransfers reads the transfers from a Reader, such as stdin
Expand Down
1 change: 1 addition & 0 deletions daemon/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
Args []string
Uid int
Gid int
ExtraEnv []string
}
)

Expand Down
25 changes: 25 additions & 0 deletions daemon/launch_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"os/exec"
"os/signal"
"reflect"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -111,6 +112,30 @@ func (launcher DaemonLauncher) Launch(ctx context.Context) (context.Context, int
return ctx, -1, errors.New("If either uid or gid is specified for daemon, both must be specified")
}

if len(launcher.ExtraEnv) > 0 {
// Merge the "extra env" options into the existing OS environment
existingEnv := os.Environ()
newEnv := make([]string, 0)
for _, defEnvStr := range existingEnv {
useEnv := true
for _, newEnvStr := range launcher.ExtraEnv {
if eqPos := strings.IndexByte(newEnvStr, '='); eqPos == -1 {
return ctx, -1, errors.Errorf("Environment override string %s lacking '=' character", newEnvStr)
} else {
envPrefix := newEnvStr[:eqPos]
if strings.HasPrefix(defEnvStr, envPrefix) {
useEnv = false
break
}
}
}
if useEnv {
newEnv = append(newEnv, defEnvStr)
}
}
cmd.Env = append(newEnv, launcher.ExtraEnv...)
}

if err := cmd.Start(); err != nil {
return ctx, -1, err
}
Expand Down
6 changes: 5 additions & 1 deletion github_scripts/osx_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
# Mac OS X instance in GitHub.
#

scriptdir=`dirname $0`

brew install minio xrootd ninja

mkdir dependencies
Expand Down Expand Up @@ -60,11 +62,13 @@ popd

git clone --depth=1 https://github.com/xrootd/xrootd.git
pushd xrootd
patch -p1 $scriptdir/pelican_protocol.patch
mkdir build
cd build
cmake .. -GNinja
ninja libXrdAccSciTokens-5.so
ninja libXrdAccSciTokens-5.so libXrdPss-5.so
sudo ln -s $PWD/src/libXrdAccSciTokens-5.so $xrootd_libdir
sudo ln -sf $PWD/src/libXrdPss-5.so $xrootd_libdir
popd

popd
Expand Down
77 changes: 77 additions & 0 deletions github_scripts/pelican_protocol.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
From 5b7357cb59a1ffe2fb99b68c1dc5796fd063acdb Mon Sep 17 00:00:00 2001
From: Brian Bockelman <bbockelman@morgridge.org>
Date: Thu, 25 Jan 2024 09:46:39 -0600
Subject: [PATCH] Add support for pelican:// protocol

In https://github.com/PelicanPlatform/xrdcl-pelican, we are developing
a XrdCl plugin that can talk to the infrastructure for a new project,
christening the URL scheme `pelican://`.

This commit adds the new schema so it can be utilized from both
xrdcp (primarily for testing) and XCache.
---
src/XrdApps/XrdCpConfig.cc | 2 ++
src/XrdApps/XrdCpFile.cc | 1 +
src/XrdApps/XrdCpFile.hh | 2 +-
src/XrdPss/XrdPssUtils.cc | 3 ++-
4 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/XrdApps/XrdCpConfig.cc b/src/XrdApps/XrdCpConfig.cc
index 890f80198cc..7f8e8702efa 100644
--- a/src/XrdApps/XrdCpConfig.cc
+++ b/src/XrdApps/XrdCpConfig.cc
@@ -385,6 +385,7 @@ do{while(optind < Argc && Legacy(optind)) {}
if (dstFile->Protocol != XrdCpFile::isFile
&& dstFile->Protocol != XrdCpFile::isStdIO
&& dstFile->Protocol != XrdCpFile::isXroot
+ && dstFile->Protocol != XrdCpFile::isPelican
&& (!Want(DoAllowHttp) && ((dstFile->Protocol == XrdCpFile::isHttp) ||
(dstFile->Protocol == XrdCpFile::isHttps))))
{FMSG(dstFile->ProtName <<"file protocol is not supported.", 22)}
@@ -903,6 +904,7 @@ void XrdCpConfig::ProcFile(const char *fname)
}
else if (!((pFile->Protocol == XrdCpFile::isXroot) ||
(pFile->Protocol == XrdCpFile::isXroots) ||
+ (pFile->Protocol == XrdCpFile::isPelican) ||
(Want(DoAllowHttp) && ((pFile->Protocol == XrdCpFile::isHttp) ||
(pFile->Protocol == XrdCpFile::isHttps)))))
{FMSG(pFile->ProtName <<" file protocol is not supported.", 22)}
diff --git a/src/XrdApps/XrdCpFile.cc b/src/XrdApps/XrdCpFile.cc
index a6f8a6496e2..e1e5dc98086 100644
--- a/src/XrdApps/XrdCpFile.cc
+++ b/src/XrdApps/XrdCpFile.cc
@@ -56,6 +56,7 @@ XrdCpFile::XrdCpFile(const char *FSpec, int &badURL)
{"root://", 7, isXroot},
{"roots://", 8, isXroots},
{"http://", 7, isHttp},
+ {"pelican://", 10, isPelican},
{"https://", 8, isHttps}
};
static int pTnum = sizeof(pTab)/sizeof(struct proto);
diff --git a/src/XrdApps/XrdCpFile.hh b/src/XrdApps/XrdCpFile.hh
index ef09301b56c..03972c360d8 100644
--- a/src/XrdApps/XrdCpFile.hh
+++ b/src/XrdApps/XrdCpFile.hh
@@ -38,7 +38,7 @@ class XrdCpFile
public:

enum PType {isOther = 0, isDir, isFile, isStdIO,
- isXroot, isXroots, isHttp, isHttps, isDevNull, isDevZero
+ isXroot, isXroots, isHttp, isHttps, isPelican, isDevNull, isDevZero
};

XrdCpFile *Next; // -> Next file in list
diff --git a/src/XrdPss/XrdPssUtils.cc b/src/XrdPss/XrdPssUtils.cc
index be14fa55c9a..42f37534f14 100644
--- a/src/XrdPss/XrdPssUtils.cc
+++ b/src/XrdPss/XrdPssUtils.cc
@@ -42,7 +42,8 @@ namespace
struct pEnt {const char *pname; int pnlen;} pTab[] =
{{ "https://", 8}, { "http://", 7},
{ "roots://", 8}, { "root://", 7},
- {"xroots://", 9}, {"xroot://", 8}
+ {"xroots://", 9}, {"xroot://", 8},
+ {"pelican://", 10}
};
int pTNum = sizeof(pTab)/sizeof(pEnt);
int xrBeg = 2;
2 changes: 1 addition & 1 deletion launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group)
xrootd.LaunchXrootdMaintenance(ctx, originServer, 2*time.Minute)

privileged := param.Origin_Multiuser.GetBool()
launchers, err := xrootd.ConfigureLaunchers(privileged, configPath, param.Origin_EnableCmsd.GetBool())
launchers, err := xrootd.ConfigureLaunchers(privileged, configPath, param.Origin_EnableCmsd.GetBool(), false)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions registry/registry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (a AdminMetadata) Equal(b AdminMetadata) bool {
a.UpdatedAt.Equal(b.UpdatedAt)
}

func IsValidRegStatus(s string) bool {
return s == "Pending" || s == "Approved" || s == "Denied" || s == "Unknown"
}

func createNamespaceTable() {
//We put a size limit on admin_metadata to guard against potentially future
//malicious large inserts
Expand Down
1 change: 1 addition & 0 deletions registry/registry_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func setupMockRegistryDB(t *testing.T) {
db = mockDB
require.NoError(t, err, "Error setting up mock namespace DB")
createNamespaceTable()
createTopologyTable()
}

func resetNamespaceDB(t *testing.T) {
Expand Down
Loading

0 comments on commit a457402

Please sign in to comment.