From b853fcfda58771c4f7d6dcdec2ff7b6afeab7c41 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Tue, 12 Dec 2023 13:50:52 -0600 Subject: [PATCH 1/5] First draft of `pelican object get` Commit for the first draft of `pelican object get`, barely tested but functionality seems correct for now. Took a lot from `pelican object copy` and kept its workflow very similar --- client/main.go | 148 +++++++++++++++++++++++++++++++++++++++++++++- cmd/object_get.go | 140 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 cmd/object_get.go diff --git a/client/main.go b/client/main.go index a6e829e68..64cce7991 100644 --- a/client/main.go +++ b/client/main.go @@ -465,7 +465,153 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name } } -// Start the transfer, whether read or write back +// Start the transfer for downloads specifically +func DoGet(sourceFile string, destination string, recursive bool) (bytesTransferred int64, err error) { + // First, create a handler for any panics that occur + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to perform transfer (DoGet):", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) captured in DoGet: %v", r) + err = errors.New(ret) + bytesTransferred = 0 + + // Attempt to add the panic to the error accumulator + AddError(errors.New(ret)) + } + }() + + // Parse the source and destination with URL parse + sourceFile, source_scheme := correctURLWithUnderscore(sourceFile) + source_url, err := url.Parse(sourceFile) + if err != nil { + log.Errorln("Failed to parse source URL:", err) + return 0, err + } + source_url.Scheme = source_scheme + + destination, dest_scheme := correctURLWithUnderscore(destination) + dest_url, err := url.Parse(destination) + if err != nil { + log.Errorln("Failed to parse destination URL:", err) + return 0, err + } + dest_url.Scheme = dest_scheme + + // If there is a host specified, prepend it to the path in the osdf case + if source_url.Host != "" { + if source_url.Scheme == "osdf" { + source_url.Path = "/" + path.Join(source_url.Host, source_url.Path) + } else if source_url.Scheme == "pelican" { + federationUrl, _ := url.Parse(source_url.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } + } + } + + sourceScheme, _ := getTokenName(source_url) + + understoodSchemes := []string{"file", "osdf", "pelican", ""} + + _, foundSource := Find(understoodSchemes, sourceScheme) + if !foundSource { + log.Errorln("Do not understand source scheme:", source_url.Scheme) + return 0, errors.New("Do not understand source scheme") + } + + if sourceScheme == "osdf" || sourceScheme == "pelican" { + sourceFile = source_url.Path + } + + if string(sourceFile[0]) != "/" { + sourceFile = "/" + sourceFile + } + + OSDFDirectorUrl := param.Federation_DirectorUrl.GetString() + useOSDFDirector := viper.IsSet("Federation.DirectorURL") + + var ns namespaces.Namespace + if useOSDFDirector { + dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl) + if err != nil { + log.Errorln("Error while querying the Director:", err) + AddError(err) + return 0, err + } + ns, err = CreateNsFromDirectorResp(dirResp) + if err != nil { + AddError(err) + return 0, err + } + } else { + ns, err = namespaces.MatchNamespace(source_url.Path) + if err != nil { + AddError(err) + return 0, err + } + } + + // get absolute path + destPath, _ := filepath.Abs(destination) + + //Check if path exists or if its in a folder + if destStat, err := os.Stat(destPath); os.IsNotExist(err) { + destination = destPath + } else if destStat.IsDir() && source_url.Query().Get("pack") == "" { + // If we have an auto-pack request, it's OK for the destination to be a directory + // Otherwise, get the base name of the source and append it to the destination dir. + sourceFilename := path.Base(sourceFile) + destination = path.Join(destPath, sourceFilename) + } + + payload := payloadStruct{} + payload.version = version + + //Fill out the payload as much as possible + payload.filename = source_url.Path + + parse_job_ad(payload) + + payload.start1 = time.Now().Unix() + + success := false + + _, token_name := getTokenName(source_url) + + var downloaded int64 + if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name, OSDFDirectorUrl); err == nil { + success = true + } + + payload.end1 = time.Now().Unix() + + payload.timestamp = payload.end1 + payload.downloadTime = (payload.end1 - payload.start1) + + if success { + payload.status = "Success" + + // Get the final size of the download file + payload.fileSize = downloaded + payload.downloadSize = downloaded + } else { + log.Error("Http GET failed! Unable to download file.") + payload.status = "Fail" + } + + if !success { + return downloaded, errors.New("failed to download file") + } else { + return downloaded, nil + } +} + +// Start the transfer, whether read or write back. Primarily used for backwards compatibility func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) { // First, create a handler for any panics that occur diff --git a/cmd/object_get.go b/cmd/object_get.go new file mode 100644 index 000000000..90309cfae --- /dev/null +++ b/cmd/object_get.go @@ -0,0 +1,140 @@ +/*************************************************************** +* +* Copyright (C) 2023, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package main + +import ( + "os" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + //"github.com/pelicanplatform/pelican/namespaces" maybe wanna have the namespaces bool still? + "github.com/pelicanplatform/pelican/param" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + getCmd = &cobra.Command{ + Use: "get {source ...} {destination}", + Short: "get a file from a Pelican federation", + Run: getMain, + } +) + +func init() { + flagSet := getCmd.Flags() + flagSet.StringP("cache", "c", "", "Cache to use") + flagSet.StringP("token", "t", "", "Token file to use for transfer") + flagSet.BoolP("recursive", "r", false, "Recursively download a directory. Forces methods to only be http to get the freshest directory contents") + flagSet.StringP("cache-list-name", "n", "xroot", "(Deprecated) Cache list to use, currently either xroot or xroots; may be ignored") + flagSet.Lookup("cache-list-name").Hidden = true + flagSet.String("caches", "", "A JSON file containing the list of caches") + objectCmd.AddCommand(getCmd) + +} + +func getMain(cmd *cobra.Command, args []string) { + + client.ObjectClientOptions.Version = version + + err := config.InitClient() + if err != nil { + log.Errorln(err) + os.Exit(1) + } + + // Set the progress bars to the command line option + client.ObjectClientOptions.Token, _ = cmd.Flags().GetString("token") + + // Check if the program was executed from a terminal + // https://rosettacode.org/wiki/Check_output_device_is_a_terminal#Go + if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode()&os.ModeCharDevice) != 0 && param.Logging_LogLocation.GetString() == "" { + client.ObjectClientOptions.ProgressBars = true + } else { + client.ObjectClientOptions.ProgressBars = false + } + + log.Debugln("Len of source:", len(args)) + if len(args) < 2 { + log.Errorln("No Source or Destination") + err = cmd.Help() + if err != nil { + log.Errorln("Failed to print out help:", err) + } + os.Exit(1) + } + source := args[:len(args)-1] + dest := args[len(args)-1] + + log.Debugln("Sources:", source) + log.Debugln("Destination:", dest) + + // Check for manually entered cache to use ?? + nearestCache, nearestCacheIsPresent := os.LookupEnv("NEAREST_CACHE") + + if nearestCacheIsPresent { + client.NearestCache = nearestCache + client.NearestCacheList = append(client.NearestCacheList, client.NearestCache) + client.CacheOverride = true + } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { + client.NearestCache = cache + client.NearestCacheList = append(client.NearestCacheList, cache) + client.CacheOverride = true + } + + if len(source) > 1 { + if destStat, err := os.Stat(dest); err != nil && destStat.IsDir() { + log.Errorln("Destination is not a directory") + os.Exit(1) + } + } + + var result error + var downloaded int64 = 0 + lastSrc := "" + for _, src := range source { + var tmpDownloaded int64 + isRecursive, _ := cmd.Flags().GetBool("recursive") + client.ObjectClientOptions.Recursive = isRecursive + tmpDownloaded, result = client.DoGet(src, dest, isRecursive) + downloaded += tmpDownloaded + if result != nil { + lastSrc = src + break + } else { + client.ClearErrors() + } + } + + // Exit with failure + if result != nil { + // Print the list of errors + errMsg := client.GetErrors() + if errMsg == "" { + errMsg = result.Error() + } + log.Errorln("Failure getting " + lastSrc + ": " + errMsg) + if client.ErrorsRetryable() { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + +} From 94e237d1c5ae0e8d49653d0fbc4c9773f1eb8659 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Tue, 12 Dec 2023 16:00:10 -0600 Subject: [PATCH 2/5] First draft of `pelican object put` First draft and not tested that much but basic functionality is there --- client/main.go | 70 +++++++++++++++++++++++ cmd/object_get.go | 3 +- cmd/object_put.go | 139 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 cmd/object_put.go diff --git a/client/main.go b/client/main.go index 64cce7991..c295abbeb 100644 --- a/client/main.go +++ b/client/main.go @@ -464,6 +464,76 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name return } } +func DoPut(sourceFile string, destination string, recursive bool) (bytesTransferred int64, err error) { + // First, create a handler for any panics that occur + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to perform transfer (DoPut):", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) captured in DoPut: %v", r) + err = errors.New(ret) + bytesTransferred = 0 + + // Attempt to add the panic to the error accumulator + AddError(errors.New(ret)) + } + }() + + // Parse the source and destination with URL parse + sourceFile, source_scheme := correctURLWithUnderscore(sourceFile) + source_url, err := url.Parse(sourceFile) + if err != nil { + log.Errorln("Failed to parse source URL:", err) + return 0, err + } + source_url.Scheme = source_scheme + + destination, dest_scheme := correctURLWithUnderscore(destination) + dest_url, err := url.Parse(destination) + if err != nil { + log.Errorln("Failed to parse destination URL:", err) + return 0, err + } + dest_url.Scheme = dest_scheme + + if dest_url.Host != "" { + if dest_url.Scheme == "osdf" || dest_url.Scheme == "stash" { + dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path) + } else if dest_url.Scheme == "pelican" { + federationUrl, _ := url.Parse(dest_url.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } + } + } + destScheme, _ := getTokenName(dest_url) + + understoodSchemes := []string{"file", "osdf", "pelican", ""} + + _, foundDest := Find(understoodSchemes, destScheme) + if !foundDest { + log.Errorln("Do not understand destination scheme:", source_url.Scheme) + return 0, errors.New("Do not understand destination scheme") + } + + // Get the namespace of the remote filesystem + // For write back, it will be the destination + // For read it will be the source. + ns, err := namespaces.MatchNamespace(dest_url.Path) + if err != nil { + log.Errorln("Failed to get namespace information:", err) + AddError(err) + return 0, err + } + var transferred int64 + transferred, err = doWriteBack(source_url.Path, dest_url, ns, recursive) + AddError(err) + return transferred, err +} // Start the transfer for downloads specifically func DoGet(sourceFile string, destination string, recursive bool) (bytesTransferred int64, err error) { diff --git a/cmd/object_get.go b/cmd/object_get.go index 90309cfae..36fe1d4bd 100644 --- a/cmd/object_get.go +++ b/cmd/object_get.go @@ -23,7 +23,6 @@ import ( "github.com/pelicanplatform/pelican/client" "github.com/pelicanplatform/pelican/config" - //"github.com/pelicanplatform/pelican/namespaces" maybe wanna have the namespaces bool still? "github.com/pelicanplatform/pelican/param" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -32,7 +31,7 @@ import ( var ( getCmd = &cobra.Command{ Use: "get {source ...} {destination}", - Short: "get a file from a Pelican federation", + Short: "Get a file from a Pelican federation", Run: getMain, } ) diff --git a/cmd/object_put.go b/cmd/object_put.go new file mode 100644 index 000000000..fc6161d44 --- /dev/null +++ b/cmd/object_put.go @@ -0,0 +1,139 @@ +/*************************************************************** +* +* Copyright (C) 2023, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package main + +import ( + "os" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/param" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + putCmd = &cobra.Command{ + Use: "put {source ...} {destination}", + Short: "Send a file to a Pelican federation", + Run: putMain, + } +) + +func init() { + flagSet := putCmd.Flags() + flagSet.StringP("cache", "c", "", "Cache to use") + flagSet.StringP("token", "t", "", "Token file to use for transfer") + flagSet.BoolP("recursive", "r", false, "Recursively upload a directory. Forces methods to only be http to get the freshest directory contents") + flagSet.StringP("cache-list-name", "n", "xroot", "(Deprecated) Cache list to use, currently either xroot or xroots; may be ignored") + flagSet.Lookup("cache-list-name").Hidden = true + flagSet.String("caches", "", "A JSON file containing the list of caches") + objectCmd.AddCommand(putCmd) + +} + +func putMain(cmd *cobra.Command, args []string) { + + client.ObjectClientOptions.Version = version + + err := config.InitClient() + if err != nil { + log.Errorln(err) + os.Exit(1) + } + + // Set the progress bars to the command line option + client.ObjectClientOptions.Token, _ = cmd.Flags().GetString("token") + + // Check if the program was executed from a terminal + // https://rosettacode.org/wiki/Check_output_device_is_a_terminal#Go + if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode()&os.ModeCharDevice) != 0 && param.Logging_LogLocation.GetString() == "" { + client.ObjectClientOptions.ProgressBars = true + } else { + client.ObjectClientOptions.ProgressBars = false + } + + log.Debugln("Len of source:", len(args)) + if len(args) < 2 { + log.Errorln("No Source or Destination") + err = cmd.Help() + if err != nil { + log.Errorln("Failed to print out help:", err) + } + os.Exit(1) + } + source := args[:len(args)-1] + dest := args[len(args)-1] + + log.Debugln("Sources:", source) + log.Debugln("Destination:", dest) + + // Check for manually entered cache to use ?? + nearestCache, nearestCacheIsPresent := os.LookupEnv("NEAREST_CACHE") + + if nearestCacheIsPresent { + client.NearestCache = nearestCache + client.NearestCacheList = append(client.NearestCacheList, client.NearestCache) + client.CacheOverride = true + } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { + client.NearestCache = cache + client.NearestCacheList = append(client.NearestCacheList, cache) + client.CacheOverride = true + } + + if len(source) > 1 { + if destStat, err := os.Stat(dest); err != nil && destStat.IsDir() { + log.Errorln("Destination is not a directory") + os.Exit(1) + } + } + + var result error + var downloaded int64 = 0 + lastSrc := "" + for _, src := range source { + var tmpDownloaded int64 + isRecursive, _ := cmd.Flags().GetBool("recursive") + client.ObjectClientOptions.Recursive = isRecursive + tmpDownloaded, result = client.DoPut(src, dest, isRecursive) + downloaded += tmpDownloaded + if result != nil { + lastSrc = src + break + } else { + client.ClearErrors() + } + } + + // Exit with failure + if result != nil { + // Print the list of errors + errMsg := client.GetErrors() + if errMsg == "" { + errMsg = result.Error() + } + log.Errorln("Failure putting " + lastSrc + ": " + errMsg) + if client.ErrorsRetryable() { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + +} From c2abdac6645a7a6d854783b8bddadb6876ba2792 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Wed, 10 Jan 2024 12:19:54 -0600 Subject: [PATCH 3/5] Fixes to object get/put Addresses Justin's suggestions for object get/put, cleaning things up and allowing PUTs to work with the director --- client/handle_http.go | 5 +- client/main.go | 193 +++++++++++++++++------------------------- cmd/object_get.go | 2 - cmd/object_put.go | 18 ---- 4 files changed, 79 insertions(+), 139 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index df9f58a92..16975fc75 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -264,7 +264,7 @@ func GenerateTransferDetailsUsingCache(cache CacheInterface, opts TransferDetail return nil } -func download_http(sourceUrl *url.URL, destination string, payload *payloadStruct, namespace namespaces.Namespace, recursive bool, tokenName string, OSDFDirectorUrl string) (bytesTransferred int64, err error) { +func download_http(sourceUrl *url.URL, destination string, payload *payloadStruct, namespace namespaces.Namespace, recursive bool, tokenName string) (bytesTransferred int64, err error) { // First, create a handler for any panics that occur defer func() { @@ -298,7 +298,8 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc // Check the env var "USE_OSDF_DIRECTOR" and decide if ordered caches should come from director var transfers []TransferDetails var files []string - closestNamespaceCaches, err := GetCachesFromNamespace(namespace, OSDFDirectorUrl != "") + directorUrl := param.Federation_DirectorUrl.GetString() + closestNamespaceCaches, err := GetCachesFromNamespace(namespace, directorUrl != "") if err != nil { log.Errorln("Failed to get namespaced caches (treated as non-fatal):", err) } diff --git a/client/main.go b/client/main.go index c295abbeb..1bceedc4b 100644 --- a/client/main.go +++ b/client/main.go @@ -464,7 +464,16 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name return } } -func DoPut(sourceFile string, destination string, recursive bool) (bytesTransferred int64, err error) { + +/* + Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request + +localObject: the source file/directory you would like to upload +remoteDestination: the end location of the upload +recursive: a boolean indicating if the source is a directory or not +*/ +func DoPut(localObject string, remoteDestination string, recursive bool) (bytesTransferred int64, err error) { + isPut := true // First, create a handler for any panics that occur defer func() { if r := recover(); r != nil { @@ -480,63 +489,66 @@ func DoPut(sourceFile string, destination string, recursive bool) (bytesTransfer }() // Parse the source and destination with URL parse - sourceFile, source_scheme := correctURLWithUnderscore(sourceFile) - source_url, err := url.Parse(sourceFile) + localObjectUrl, err := url.Parse(localObject) if err != nil { log.Errorln("Failed to parse source URL:", err) return 0, err } - source_url.Scheme = source_scheme - destination, dest_scheme := correctURLWithUnderscore(destination) - dest_url, err := url.Parse(destination) + remoteDestination, remoteDestScheme := correctURLWithUnderscore(remoteDestination) + remoteDestUrl, err := url.Parse(remoteDestination) if err != nil { - log.Errorln("Failed to parse destination URL:", err) + log.Errorln("Failed to parse remote destination URL:", err) return 0, err } - dest_url.Scheme = dest_scheme + remoteDestUrl.Scheme = remoteDestScheme - if dest_url.Host != "" { - if dest_url.Scheme == "osdf" || dest_url.Scheme == "stash" { - dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path) - } else if dest_url.Scheme == "pelican" { - federationUrl, _ := url.Parse(dest_url.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() + if remoteDestUrl.Host != "" { + if remoteDestUrl.Scheme == "osdf" || remoteDestUrl.Scheme == "stash" { + remoteDestUrl.Path, err = url.JoinPath(remoteDestUrl.Host, remoteDestUrl.Path) if err != nil { + log.Errorln("Failed to join remote destination url path:", err) return 0, err } } } - destScheme, _ := getTokenName(dest_url) + remoteDestScheme, _ = getTokenName(remoteDestUrl) understoodSchemes := []string{"file", "osdf", "pelican", ""} - _, foundDest := Find(understoodSchemes, destScheme) + _, foundDest := Find(understoodSchemes, remoteDestScheme) if !foundDest { - log.Errorln("Do not understand destination scheme:", source_url.Scheme) - return 0, errors.New("Do not understand destination scheme") + return 0, fmt.Errorf("Do not understand the destination scheme: %s. Permitted values are %s", + remoteDestUrl.Scheme, strings.Join(understoodSchemes, ", ")) } + directorUrl := param.Federation_DirectorUrl.GetString() + // Get the namespace of the remote filesystem // For write back, it will be the destination - // For read it will be the source. - ns, err := namespaces.MatchNamespace(dest_url.Path) + if !strings.HasPrefix(remoteDestination, "/") { + remoteDestination = strings.TrimPrefix(remoteDestination, remoteDestScheme+"://") + } + ns, err := getNamespaceInfo(remoteDestination, directorUrl, isPut) if err != nil { - log.Errorln("Failed to get namespace information:", err) - AddError(err) - return 0, err + log.Errorln(err) + return 0, errors.New("Failed to get namespace information from source") } - var transferred int64 - transferred, err = doWriteBack(source_url.Path, dest_url, ns, recursive) + uploadedBytes, err := doWriteBack(localObjectUrl.Path, remoteDestUrl, ns, recursive) AddError(err) - return transferred, err + return uploadedBytes, err + } -// Start the transfer for downloads specifically -func DoGet(sourceFile string, destination string, recursive bool) (bytesTransferred int64, err error) { +/* + Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request + +remoteObject: the source file/directory you would like to upload +localDestination: the end location of the upload +recursive: a boolean indicating if the source is a directory or not +*/ +func DoGet(remoteObject string, localDestination string, recursive bool) (bytesTransferred int64, err error) { + isPut := false // First, create a handler for any panics that occur defer func() { if r := recover(); r != nil { @@ -551,99 +563,70 @@ func DoGet(sourceFile string, destination string, recursive bool) (bytesTransfer } }() - // Parse the source and destination with URL parse - sourceFile, source_scheme := correctURLWithUnderscore(sourceFile) - source_url, err := url.Parse(sourceFile) + // Parse the source with URL parse + remoteObject, remoteObjectScheme := correctURLWithUnderscore(remoteObject) + remoteObjectUrl, err := url.Parse(remoteObject) if err != nil { log.Errorln("Failed to parse source URL:", err) return 0, err } - source_url.Scheme = source_scheme - - destination, dest_scheme := correctURLWithUnderscore(destination) - dest_url, err := url.Parse(destination) - if err != nil { - log.Errorln("Failed to parse destination URL:", err) - return 0, err - } - dest_url.Scheme = dest_scheme + remoteObjectUrl.Scheme = remoteObjectScheme // If there is a host specified, prepend it to the path in the osdf case - if source_url.Host != "" { - if source_url.Scheme == "osdf" { - source_url.Path = "/" + path.Join(source_url.Host, source_url.Path) - } else if source_url.Scheme == "pelican" { - federationUrl, _ := url.Parse(source_url.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() + if remoteObjectUrl.Host != "" { + if remoteObjectUrl.Scheme == "osdf" { + remoteObjectUrl.Path, err = url.JoinPath(remoteObjectUrl.Host, remoteObjectUrl.Path) if err != nil { + log.Errorln("Failed to join source url path:", err) return 0, err } } } - sourceScheme, _ := getTokenName(source_url) + remoteObjectScheme, _ = getTokenName(remoteObjectUrl) understoodSchemes := []string{"file", "osdf", "pelican", ""} - _, foundSource := Find(understoodSchemes, sourceScheme) + _, foundSource := Find(understoodSchemes, remoteObjectScheme) if !foundSource { - log.Errorln("Do not understand source scheme:", source_url.Scheme) - return 0, errors.New("Do not understand source scheme") + return 0, fmt.Errorf("Do not understand the source scheme: %s. Permitted values are %s", + remoteObjectUrl.Scheme, strings.Join(understoodSchemes, ", ")) } - if sourceScheme == "osdf" || sourceScheme == "pelican" { - sourceFile = source_url.Path + if remoteObjectScheme == "osdf" || remoteObjectScheme == "pelican" { + remoteObject = remoteObjectUrl.Path } - if string(sourceFile[0]) != "/" { - sourceFile = "/" + sourceFile + if string(remoteObject[0]) != "/" { + remoteObject = "/" + remoteObject } - OSDFDirectorUrl := param.Federation_DirectorUrl.GetString() - useOSDFDirector := viper.IsSet("Federation.DirectorURL") + directorUrl := param.Federation_DirectorUrl.GetString() - var ns namespaces.Namespace - if useOSDFDirector { - dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl) - if err != nil { - log.Errorln("Error while querying the Director:", err) - AddError(err) - return 0, err - } - ns, err = CreateNsFromDirectorResp(dirResp) - if err != nil { - AddError(err) - return 0, err - } - } else { - ns, err = namespaces.MatchNamespace(source_url.Path) - if err != nil { - AddError(err) - return 0, err - } + ns, err := getNamespaceInfo(remoteObject, directorUrl, isPut) + if err != nil { + log.Errorln(err) + return 0, errors.New("Failed to get namespace information from source") } // get absolute path - destPath, _ := filepath.Abs(destination) + localDestPath, _ := filepath.Abs(localDestination) //Check if path exists or if its in a folder - if destStat, err := os.Stat(destPath); os.IsNotExist(err) { - destination = destPath - } else if destStat.IsDir() && source_url.Query().Get("pack") == "" { + if destStat, err := os.Stat(localDestPath); os.IsNotExist(err) { + localDestination = localDestPath + } else if destStat.IsDir() && remoteObjectUrl.Query().Get("pack") == "" { // If we have an auto-pack request, it's OK for the destination to be a directory // Otherwise, get the base name of the source and append it to the destination dir. - sourceFilename := path.Base(sourceFile) - destination = path.Join(destPath, sourceFilename) + remoteObjectFilename := path.Base(remoteObject) + localDestination = path.Join(localDestPath, remoteObjectFilename) } payload := payloadStruct{} payload.version = version //Fill out the payload as much as possible - payload.filename = source_url.Path + payload.filename = remoteObjectUrl.Path parse_job_ad(payload) @@ -651,10 +634,10 @@ func DoGet(sourceFile string, destination string, recursive bool) (bytesTransfer success := false - _, token_name := getTokenName(source_url) + _, token_name := getTokenName(remoteObjectUrl) var downloaded int64 - if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name, OSDFDirectorUrl); err == nil { + if downloaded, err = download_http(remoteObjectUrl, localDestination, &payload, ns, recursive, token_name); err == nil { success = true } @@ -719,30 +702,12 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re if source_url.Host != "" { if source_url.Scheme == "osdf" || source_url.Scheme == "stash" { source_url.Path = "/" + path.Join(source_url.Host, source_url.Path) - } else if source_url.Scheme == "pelican" { - federationUrl, _ := url.Parse(source_url.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return 0, err - } } } if dest_url.Host != "" { if dest_url.Scheme == "osdf" || dest_url.Scheme == "stash" { dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path) - } else if dest_url.Scheme == "pelican" { - federationUrl, _ := url.Parse(dest_url.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return 0, err - } } } @@ -843,7 +808,7 @@ Loop: switch method { case "http": log.Info("Trying HTTP...") - if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name, OSDFDirectorUrl); err == nil { + if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name); err == nil { success = true break Loop } @@ -864,17 +829,11 @@ Loop: // Get the final size of the download file payload.fileSize = downloaded payload.downloadSize = downloaded + return downloaded, nil } else { - log.Error("All methods failed! Unable to download file.") payload.status = "Fail" + return downloaded, errors.New("All methods failed! Unable to download file.") } - - if !success { - return downloaded, errors.New("failed to download file") - } else { - return downloaded, nil - } - } // Find takes a slice and looks for an element in it. If found it will diff --git a/cmd/object_get.go b/cmd/object_get.go index 36fe1d4bd..edd750a9e 100644 --- a/cmd/object_get.go +++ b/cmd/object_get.go @@ -45,7 +45,6 @@ func init() { flagSet.Lookup("cache-list-name").Hidden = true flagSet.String("caches", "", "A JSON file containing the list of caches") objectCmd.AddCommand(getCmd) - } func getMain(cmd *cobra.Command, args []string) { @@ -135,5 +134,4 @@ func getMain(cmd *cobra.Command, args []string) { } os.Exit(1) } - } diff --git a/cmd/object_put.go b/cmd/object_put.go index fc6161d44..6f196d38b 100644 --- a/cmd/object_put.go +++ b/cmd/object_put.go @@ -38,14 +38,9 @@ var ( func init() { flagSet := putCmd.Flags() - flagSet.StringP("cache", "c", "", "Cache to use") flagSet.StringP("token", "t", "", "Token file to use for transfer") flagSet.BoolP("recursive", "r", false, "Recursively upload a directory. Forces methods to only be http to get the freshest directory contents") - flagSet.StringP("cache-list-name", "n", "xroot", "(Deprecated) Cache list to use, currently either xroot or xroots; may be ignored") - flagSet.Lookup("cache-list-name").Hidden = true - flagSet.String("caches", "", "A JSON file containing the list of caches") objectCmd.AddCommand(putCmd) - } func putMain(cmd *cobra.Command, args []string) { @@ -84,19 +79,6 @@ func putMain(cmd *cobra.Command, args []string) { log.Debugln("Sources:", source) log.Debugln("Destination:", dest) - // Check for manually entered cache to use ?? - nearestCache, nearestCacheIsPresent := os.LookupEnv("NEAREST_CACHE") - - if nearestCacheIsPresent { - client.NearestCache = nearestCache - client.NearestCacheList = append(client.NearestCacheList, client.NearestCache) - client.CacheOverride = true - } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { - client.NearestCache = cache - client.NearestCacheList = append(client.NearestCacheList, cache) - client.CacheOverride = true - } - if len(source) > 1 { if destStat, err := os.Stat(dest); err != nil && destStat.IsDir() { log.Errorln("Destination is not a directory") From 19f093aca12a23081e035cb653e3e7e7aa317716 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Wed, 10 Jan 2024 13:23:44 -0600 Subject: [PATCH 4/5] Changes to allow stash_plugin and test to succeed --- client/main.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/client/main.go b/client/main.go index 1bceedc4b..aff979a43 100644 --- a/client/main.go +++ b/client/main.go @@ -510,6 +510,15 @@ func DoPut(localObject string, remoteDestination string, recursive bool) (bytesT log.Errorln("Failed to join remote destination url path:", err) return 0, err } + } else if remoteDestUrl.Scheme == "pelican" { + federationUrl, _ := url.Parse(remoteDestUrl.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } } } remoteDestScheme, _ = getTokenName(remoteDestUrl) @@ -580,6 +589,15 @@ func DoGet(remoteObject string, localDestination string, recursive bool) (bytesT log.Errorln("Failed to join source url path:", err) return 0, err } + } else if remoteObjectUrl.Scheme == "pelican" { + federationUrl, _ := url.Parse(remoteObjectUrl.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } } } @@ -702,12 +720,31 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re if source_url.Host != "" { if source_url.Scheme == "osdf" || source_url.Scheme == "stash" { source_url.Path = "/" + path.Join(source_url.Host, source_url.Path) + } else if source_url.Scheme == "pelican" { + fmt.Printf("\n\n\nHERE\n\n\n") + federationUrl, _ := url.Parse(source_url.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } } } if dest_url.Host != "" { if dest_url.Scheme == "osdf" || dest_url.Scheme == "stash" { dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path) + } else if dest_url.Scheme == "pelican" { + federationUrl, _ := url.Parse(dest_url.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + viper.Set("Federation.DiscoveryUrl", federationUrl.String()) + err = config.DiscoverFederation() + if err != nil { + return 0, err + } } } From bde8d971f65aeadd5dd1d0cc63e4fed41ca90204 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Wed, 10 Jan 2024 13:39:47 -0600 Subject: [PATCH 5/5] Removed debug print statement --- client/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/client/main.go b/client/main.go index aff979a43..e55b96e01 100644 --- a/client/main.go +++ b/client/main.go @@ -721,7 +721,6 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re if source_url.Scheme == "osdf" || source_url.Scheme == "stash" { source_url.Path = "/" + path.Join(source_url.Host, source_url.Path) } else if source_url.Scheme == "pelican" { - fmt.Printf("\n\n\nHERE\n\n\n") federationUrl, _ := url.Parse(source_url.String()) federationUrl.Scheme = "https" federationUrl.Path = ""