Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to pelican plugin staging #882

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions classads/classads.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,72 @@ func attributeSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err
}
return 0, nil, nil
}

func ParseShadowClassAd(line string) (ClassAd, error) {
var ad ClassAd
ad.attributes = make(map[string]interface{})

// Trim the spaces and "[" "]"
line = strings.TrimSpace(line)
line = strings.TrimPrefix(line, "[")
line = strings.TrimSuffix(line, "]")

attributeScanner := bufio.NewScanner(strings.NewReader(line))
attributeScanner.Split(attributeShadowSplitFunc)
for attributeScanner.Scan() {
attrStr := attributeScanner.Text()
attrStr = strings.TrimSpace(attrStr)
if attrStr == "" {
continue
}

// Split on the first "="
attrSplit := strings.SplitN(attrStr, "=", 2)
name := strings.TrimSpace(attrSplit[0])

// Check for quoted attribute and remove it
value := strings.TrimSpace(attrSplit[1])

// If the value is quoted, remove the quotes
if strings.HasPrefix(value, "\"") && strings.HasSuffix(value, "\"") {
value = strings.Trim(value, "\"")
}

// Convert the value based on its type
if intValue, err := strconv.Atoi(value); err == nil {
// If the value is a number, we know it's an integer
ad.Set(name, intValue)
} else if floatValue, err := strconv.ParseFloat(value, 64); err == nil {
// If the value is a float, we know it's a float
ad.Set(name, floatValue)
} else if value == "true" || value == "false" {
// If the value is a boolean, we know it's a boolean
ad.Set(name, value == "true")
} else {
// Otherwise, we assume it's a string
ad.Set(name, value)
}
}
return ad, nil
}

// Split the classad by attribute, at the first semi-colon not in quotes
func attributeShadowSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// Look for the next newline character in the input data
if i := strings.Index(string(data), "\n"); i >= 0 {
// Found a newline character, return the split point
return i + 1, data[0:i], nil
}

// If at end of file and no newline character is found, return the entire remaining data
if atEOF {
return len(data), data, nil
}

// Need more data to find a newline character
return 0, nil, nil
}
3 changes: 3 additions & 0 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func stashPluginMain(args []string) {
isConfigErr = true
}

// Want to try to force logging to stderr because that is how we can see logging in condor starter log
log.SetOutput(os.Stderr)

// Parse command line arguments
var upload bool = false
// Set the options
Expand Down
202 changes: 119 additions & 83 deletions cmd/plugin_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package main
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -89,34 +90,18 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

originPrefixStr := param.StagePlugin_OriginPrefix.GetString()
if len(originPrefixStr) == 0 {
log.Errorln("Origin prefix not specified; must be a URL (osdf://...)")
os.Exit(1)
}
originPrefixUri, err := url.Parse(originPrefixStr)
mountPrefixStr := param.StagePlugin_MountPrefix.GetString()
shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString()

originPrefixUri, err := validatePrefixes(originPrefixStr, mountPrefixStr, shadowOriginPrefixStr)
if err != nil {
log.Errorln("Origin prefix must be a URL (osdf://...):", err)
os.Exit(1)
}
if originPrefixUri.Scheme != "osdf" {
log.Errorln("Origin prefix scheme must be osdf://:", originPrefixUri.Scheme)
log.Errorln("Problem validating provided prefixes:", err)
os.Exit(1)
}

originPrefixPath := path.Clean("/" + originPrefixUri.Host + "/" + originPrefixUri.Path)
log.Debugln("Local origin prefix:", originPrefixPath)

mountPrefixStr := param.StagePlugin_MountPrefix.GetString()
if len(mountPrefixStr) == 0 {
log.Errorln("Mount prefix is required; must be a local path (/mnt/foo/...)")
os.Exit(1)
}

shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString()
if len(shadowOriginPrefixStr) == 0 {
log.Errorln("Shadow origin prefix is required; must be a URL (osdf://....)")
os.Exit(1)
}

tokenLocation := param.Plugin_Token.GetString()

pb := newProgressBar()
Expand All @@ -128,51 +113,12 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
pb.launchDisplay(ctx)
}

var sources []string
var extraSources []string
isHook := param.StagePlugin_Hook.GetBool()
if isHook {
buffer := make([]byte, 100*1024)
bytesread, err := os.Stdin.Read(buffer)
if err != nil {
log.Errorln("Failed to read ClassAd from stdin:", err)
os.Exit(1)
}
classad, err := classads.ParseClassAd(string(buffer[:bytesread]))
if err != nil {
log.Errorln("Failed to parse ClassAd from stdin: ", err)
os.Exit(1)
}
inputList, err := classad.Get("TransferInput")
if err != nil || inputList == nil {
// No TransferInput, no need to transform...
os.Exit(0)
}
inputListStr, ok := inputList.(string)
if !ok {
log.Errorln("TransferInput is not a string")
os.Exit(1)
}
re := regexp.MustCompile(`[,\s]+`)
for _, source := range re.Split(inputListStr, -1) {
log.Debugln("Examining transfer input file", source)
if strings.HasPrefix(source, mountPrefixStr) {
sources = append(sources, source)
} else {
// Replace the osdf:// prefix with the local mount path
source_uri, err := url.Parse(source)
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
if err == nil && source_uri_scheme == "osdf" {
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
if strings.HasPrefix(source_path, originPrefixPath) {
sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):])
continue
}
}
extraSources = append(extraSources, source)
}
}
} else {
var sources, extraSources []string
var exitCode int

// If not a condor hook, our souces come from our args
if !isHook {
log.Debugln("Len of source:", len(args))
if len(args) < 1 {
log.Errorln("No ingest sources")
Expand All @@ -182,11 +128,39 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
os.Exit(1)
}
sources = args
log.Debugln("Sources:", sources)
} else { // Otherwise, parse the classad for our sources
// We pass in stdin here because that is how we get the classad
sources, extraSources, err, exitCode = processTransferInput(os.Stdin, mountPrefixStr, originPrefixPath)
if err != nil {
log.Errorln("Failure to get sources from job's classad:", err)
os.Exit(exitCode)
}
}
log.Debugln("Sources:", sources)

var result error
var xformSources []string

xformSources, result = doPluginStaging(sources, extraSources, mountPrefixStr, shadowOriginPrefixStr, tokenLocation)
// Exit with failure
if result != nil {
// Print the list of errors
log.Errorln("Failure in staging files:", result)
if client.ShouldRetry(result) {
log.Errorln("Errors are retryable")
os.Exit(11)
}
os.Exit(1)
}
// If we are a condor hook, we need to print the classad change out. Condor will notice it and handle the rest
if isHook {
printOutput(xformSources, extraSources)
}
}

// This function performs the actual "staging" on the specified shadow origin
func doPluginStaging(sources []string, extraSources []string, mountPrefixStr, shadowOriginPrefixStr, tokenLocation string) (xformSources []string, result error) {

for _, src := range sources {
newSource := ""
_, newSource, result = client.DoShadowIngest(context.Background(), src, mountPrefixStr, shadowOriginPrefixStr, client.WithTokenLocation(tokenLocation), client.WithAcquireToken(false))
Expand All @@ -203,23 +177,85 @@ func stagePluginMain(cmd *cobra.Command, args []string) {
xformSources = append(xformSources, newSource)
}

// Exit with failure
if result != nil {
// Print the list of errors
log.Errorln("Failure in staging files:", result)
if client.ShouldRetry(result) {
log.Errorln("Errors are retryable")
os.Exit(11)
}
os.Exit(1)
return xformSources, result
}

// This function is used to print our changes out in the case we are a condor hook
func printOutput(xformSources []string, extraSources []string) {
inputsStr := strings.Join(extraSources, ", ")
if len(extraSources) > 0 && len(xformSources) > 0 {
inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ")
} else if len(xformSources) > 0 {
inputsStr = strings.Join(xformSources, ", ")
}
if isHook {
inputsStr := strings.Join(extraSources, ", ")
if len(extraSources) > 0 && len(xformSources) > 0 {
inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ")
} else if len(xformSources) > 0 {
inputsStr = strings.Join(xformSources, ", ")
fmt.Printf("TransferInput = \"%s\"", inputsStr)
}

// This function is utilized to validate the arguments passed in to ensure they exist and are in the correct format
func validatePrefixes(originPrefixStr string, mountPrefixStr string, shadowOriginPrefixStr string) (originPrefixUri *url.URL, err error) {
if len(originPrefixStr) == 0 {
return nil, fmt.Errorf("Origin prefix not specified; must be a URL (osdf://...)")
}

originPrefixUri, err = url.Parse(originPrefixStr)
if err != nil {
return nil, fmt.Errorf("Origin prefix must be a URL (osdf://...): %v", err)
}
if originPrefixUri.Scheme != "osdf" {
return nil, fmt.Errorf("Origin prefix scheme must be osdf://: %s", originPrefixUri.Scheme)
}

if len(mountPrefixStr) == 0 {
return nil, fmt.Errorf("Mount prefix is required; must be a local path (/mnt/foo/...)")
}
if len(shadowOriginPrefixStr) == 0 {
return nil, fmt.Errorf("Shadow origin prefix is required; must be a URL (osdf://....)")
}

return originPrefixUri, nil
}

// This function is used when we are using a condor hook and need to get our sources from the "TransferInput" classad
// We return our sources, any extra sources, an err, and the exit code (since we have a case to exit 0)
// Note: we pass in a reader for testability but the main function will always pass stdin to get the classad
func processTransferInput(reader io.Reader, mountPrefixStr string, originPrefixPath string) (sources []string, extraSources []string, err error, exitCode int) {
buffer := make([]byte, 100*1024)
bytesread, err := reader.Read(buffer)
if err != nil {
return nil, nil, fmt.Errorf("Failed to read ClassAd from stdin: %v", err), 1
}
classad, err := classads.ParseShadowClassAd(string(buffer[:bytesread]))
if err != nil {
return nil, nil, fmt.Errorf("Failed to parse ClassAd from stdin: %v", err), 1
}
inputList, err := classad.Get("TransferInput")
if err != nil || inputList == nil {
// No TransferInput, no need to transform therefore we exit(0)
return nil, nil, fmt.Errorf("No transfer input found in classad, no need to transform."), 0
}
inputListStr, ok := inputList.(string)
if !ok {
return nil, nil, fmt.Errorf("TransferInput is not a string"), 1
}
re := regexp.MustCompile(`[,\s]+`)
for _, source := range re.Split(inputListStr, -1) {
log.Debugln("Examining transfer input file", source)
if strings.HasPrefix(source, mountPrefixStr) {
sources = append(sources, source)
} else {
// Replace the osdf:// prefix with the local mount path
source_uri, err := url.Parse(source)
source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0]
if err == nil && source_uri_scheme == "osdf" {
source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path)
if strings.HasPrefix(source_path, originPrefixPath) {
sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):])
continue
}
}
extraSources = append(extraSources, source)
}
fmt.Printf("TransferInput = \"%s\"", inputsStr)
}
log.Debugln("Sources:", sources)
return sources, extraSources, nil, 0
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ func InitConfig() {
log.Errorf("Failed to access specified log file. Error: %v", err)
os.Exit(1)
}

fmt.Fprintf(os.Stderr, "Logging.LogLocation is set to %s. All logs are redirected to the log file.\n", logLocation)
log.SetOutput(f)
}
Expand Down
Loading