Skip to content

Commit

Permalink
Merge pull request #490 from joereuss12/object-get-and-put-branch
Browse files Browse the repository at this point in the history
`pelican object get` and `pelican object put`
  • Loading branch information
jhiemstrawisc authored Jan 10, 2024
2 parents 6147d44 + bde8d97 commit 4c09b62
Show file tree
Hide file tree
Showing 4 changed files with 482 additions and 12 deletions.
5 changes: 3 additions & 2 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func GenerateTransferDetailsUsingCache(cache CacheInterface, opts TransferDetail
return nil
}

func download_http(sourceUrl *url.URL, destination string, payload *payloadStruct, namespace namespaces.Namespace, recursive bool, tokenName string, OSDFDirectorUrl string) (bytesTransferred int64, err error) {
func download_http(sourceUrl *url.URL, destination string, payload *payloadStruct, namespace namespaces.Namespace, recursive bool, tokenName string) (bytesTransferred int64, err error) {

// First, create a handler for any panics that occur
defer func() {
Expand Down Expand Up @@ -298,7 +298,8 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc
// Check the env var "USE_OSDF_DIRECTOR" and decide if ordered caches should come from director
var transfers []TransferDetails
var files []string
closestNamespaceCaches, err := GetCachesFromNamespace(namespace, OSDFDirectorUrl != "")
directorUrl := param.Federation_DirectorUrl.GetString()
closestNamespaceCaches, err := GetCachesFromNamespace(namespace, directorUrl != "")
if err != nil {
log.Errorln("Failed to get namespaced caches (treated as non-fatal):", err)
}
Expand Down
231 changes: 221 additions & 10 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,224 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name
}
}

// Start the transfer, whether read or write back
/*
Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request
localObject: the source file/directory you would like to upload
remoteDestination: the end location of the upload
recursive: a boolean indicating if the source is a directory or not
*/
func DoPut(localObject string, remoteDestination string, recursive bool) (bytesTransferred int64, err error) {
isPut := true
// First, create a handler for any panics that occur
defer func() {
if r := recover(); r != nil {
log.Debugln("Panic captured while attempting to perform transfer (DoPut):", r)
log.Debugln("Panic caused by the following", string(debug.Stack()))
ret := fmt.Sprintf("Unrecoverable error (panic) captured in DoPut: %v", r)
err = errors.New(ret)
bytesTransferred = 0

// Attempt to add the panic to the error accumulator
AddError(errors.New(ret))
}
}()

// Parse the source and destination with URL parse
localObjectUrl, err := url.Parse(localObject)
if err != nil {
log.Errorln("Failed to parse source URL:", err)
return 0, err
}

remoteDestination, remoteDestScheme := correctURLWithUnderscore(remoteDestination)
remoteDestUrl, err := url.Parse(remoteDestination)
if err != nil {
log.Errorln("Failed to parse remote destination URL:", err)
return 0, err
}
remoteDestUrl.Scheme = remoteDestScheme

if remoteDestUrl.Host != "" {
if remoteDestUrl.Scheme == "osdf" || remoteDestUrl.Scheme == "stash" {
remoteDestUrl.Path, err = url.JoinPath(remoteDestUrl.Host, remoteDestUrl.Path)
if err != nil {
log.Errorln("Failed to join remote destination url path:", err)
return 0, err
}
} else if remoteDestUrl.Scheme == "pelican" {
federationUrl, _ := url.Parse(remoteDestUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
viper.Set("Federation.DiscoveryUrl", federationUrl.String())
err = config.DiscoverFederation()
if err != nil {
return 0, err
}
}
}
remoteDestScheme, _ = getTokenName(remoteDestUrl)

understoodSchemes := []string{"file", "osdf", "pelican", ""}

_, foundDest := Find(understoodSchemes, remoteDestScheme)
if !foundDest {
return 0, fmt.Errorf("Do not understand the destination scheme: %s. Permitted values are %s",
remoteDestUrl.Scheme, strings.Join(understoodSchemes, ", "))
}

directorUrl := param.Federation_DirectorUrl.GetString()

// Get the namespace of the remote filesystem
// For write back, it will be the destination
if !strings.HasPrefix(remoteDestination, "/") {
remoteDestination = strings.TrimPrefix(remoteDestination, remoteDestScheme+"://")
}
ns, err := getNamespaceInfo(remoteDestination, directorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}
uploadedBytes, err := doWriteBack(localObjectUrl.Path, remoteDestUrl, ns, recursive)
AddError(err)
return uploadedBytes, err

}

/*
Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request
remoteObject: the source file/directory you would like to upload
localDestination: the end location of the upload
recursive: a boolean indicating if the source is a directory or not
*/
func DoGet(remoteObject string, localDestination string, recursive bool) (bytesTransferred int64, err error) {
isPut := false
// First, create a handler for any panics that occur
defer func() {
if r := recover(); r != nil {
log.Debugln("Panic captured while attempting to perform transfer (DoGet):", r)
log.Debugln("Panic caused by the following", string(debug.Stack()))
ret := fmt.Sprintf("Unrecoverable error (panic) captured in DoGet: %v", r)
err = errors.New(ret)
bytesTransferred = 0

// Attempt to add the panic to the error accumulator
AddError(errors.New(ret))
}
}()

// Parse the source with URL parse
remoteObject, remoteObjectScheme := correctURLWithUnderscore(remoteObject)
remoteObjectUrl, err := url.Parse(remoteObject)
if err != nil {
log.Errorln("Failed to parse source URL:", err)
return 0, err
}
remoteObjectUrl.Scheme = remoteObjectScheme

// If there is a host specified, prepend it to the path in the osdf case
if remoteObjectUrl.Host != "" {
if remoteObjectUrl.Scheme == "osdf" {
remoteObjectUrl.Path, err = url.JoinPath(remoteObjectUrl.Host, remoteObjectUrl.Path)
if err != nil {
log.Errorln("Failed to join source url path:", err)
return 0, err
}
} else if remoteObjectUrl.Scheme == "pelican" {
federationUrl, _ := url.Parse(remoteObjectUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
viper.Set("Federation.DiscoveryUrl", federationUrl.String())
err = config.DiscoverFederation()
if err != nil {
return 0, err
}
}
}

remoteObjectScheme, _ = getTokenName(remoteObjectUrl)

understoodSchemes := []string{"file", "osdf", "pelican", ""}

_, foundSource := Find(understoodSchemes, remoteObjectScheme)
if !foundSource {
return 0, fmt.Errorf("Do not understand the source scheme: %s. Permitted values are %s",
remoteObjectUrl.Scheme, strings.Join(understoodSchemes, ", "))
}

if remoteObjectScheme == "osdf" || remoteObjectScheme == "pelican" {
remoteObject = remoteObjectUrl.Path
}

if string(remoteObject[0]) != "/" {
remoteObject = "/" + remoteObject
}

directorUrl := param.Federation_DirectorUrl.GetString()

ns, err := getNamespaceInfo(remoteObject, directorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}

// get absolute path
localDestPath, _ := filepath.Abs(localDestination)

//Check if path exists or if its in a folder
if destStat, err := os.Stat(localDestPath); os.IsNotExist(err) {
localDestination = localDestPath
} else if destStat.IsDir() && remoteObjectUrl.Query().Get("pack") == "" {
// If we have an auto-pack request, it's OK for the destination to be a directory
// Otherwise, get the base name of the source and append it to the destination dir.
remoteObjectFilename := path.Base(remoteObject)
localDestination = path.Join(localDestPath, remoteObjectFilename)
}

payload := payloadStruct{}
payload.version = version

//Fill out the payload as much as possible
payload.filename = remoteObjectUrl.Path

parse_job_ad(payload)

payload.start1 = time.Now().Unix()

success := false

_, token_name := getTokenName(remoteObjectUrl)

var downloaded int64
if downloaded, err = download_http(remoteObjectUrl, localDestination, &payload, ns, recursive, token_name); err == nil {
success = true
}

payload.end1 = time.Now().Unix()

payload.timestamp = payload.end1
payload.downloadTime = (payload.end1 - payload.start1)

if success {
payload.status = "Success"

// Get the final size of the download file
payload.fileSize = downloaded
payload.downloadSize = downloaded
} else {
log.Error("Http GET failed! Unable to download file.")
payload.status = "Fail"
}

if !success {
return downloaded, errors.New("failed to download file")
} else {
return downloaded, nil
}
}

// Start the transfer, whether read or write back. Primarily used for backwards compatibility
func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) {

// First, create a handler for any panics that occur
Expand Down Expand Up @@ -627,7 +844,7 @@ Loop:
switch method {
case "http":
log.Info("Trying HTTP...")
if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name, OSDFDirectorUrl); err == nil {
if downloaded, err = download_http(source_url, destination, &payload, ns, recursive, token_name); err == nil {
success = true
break Loop
}
Expand All @@ -648,17 +865,11 @@ Loop:
// Get the final size of the download file
payload.fileSize = downloaded
payload.downloadSize = downloaded
return downloaded, nil
} else {
log.Error("All methods failed! Unable to download file.")
payload.status = "Fail"
return downloaded, errors.New("All methods failed! Unable to download file.")
}

if !success {
return downloaded, errors.New("failed to download file")
} else {
return downloaded, nil
}

}

// Find takes a slice and looks for an element in it. If found it will
Expand Down
Loading

0 comments on commit 4c09b62

Please sign in to comment.