From c228170a76a2ab0d711304224e8333bf45fb6188 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Fri, 1 Mar 2024 13:56:04 -0600 Subject: [PATCH 1/5] Fixes to pelican plugin staging Changes: - Found that how condor gives hooks a classad is in a different format than the plugin for some reason. For the hook the classad is split up by newlines but for the plugin it is by ';'. Therefore, made a new function for plugin stage to utilize. - Added function within the plugin to force logging to go to stderr (even though that is the default) as well as a file called `.PelicanPlugin.log`. This extra file is something you can specify in a condor submit file in `transfer_output_files` to get it back but only works if the job does not go on hold/if the plugin does not fail (not super helpful yet) --- classads/classads.go | 69 ++++++++++++++++++++++++++++++++++++++++++++ cmd/plugin.go | 31 ++++++++++++++++++++ cmd/plugin_stage.go | 3 +- 3 files changed, 102 insertions(+), 1 deletion(-) diff --git a/classads/classads.go b/classads/classads.go index 1a91e85e0..00c20f5e6 100644 --- a/classads/classads.go +++ b/classads/classads.go @@ -222,3 +222,72 @@ func attributeSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err } return 0, nil, nil } + +func ParseShadowClassAd(line string) (ClassAd, error) { + var ad ClassAd + ad.attributes = make(map[string]interface{}) + + // Trim the spaces and "[" "]" + line = strings.TrimSpace(line) + line = strings.TrimPrefix(line, "[") + line = strings.TrimSuffix(line, "]") + + attributeScanner := bufio.NewScanner(strings.NewReader(line)) + attributeScanner.Split(attributeShadowSplitFunc) + for attributeScanner.Scan() { + attrStr := attributeScanner.Text() + attrStr = strings.TrimSpace(attrStr) + if attrStr == "" { + continue + } + + // Split on the first "=" + attrSplit := strings.SplitN(attrStr, "=", 2) + name := strings.TrimSpace(attrSplit[0]) + + // Check for quoted attribute and remove it + value := strings.TrimSpace(attrSplit[1]) + + // If the value is quoted, remove the quotes + if strings.HasPrefix(value, "\"") && strings.HasSuffix(value, "\"") { + value = strings.Trim(value, "\"") + } + + // Convert the value based on its type + if intValue, err := strconv.Atoi(value); err == nil { + // If the value is a number, we know it's an integer + ad.Set(name, intValue) + } else if floatValue, err := strconv.ParseFloat(value, 64); err == nil { + // If the value is a float, we know it's a float + ad.Set(name, floatValue) + } else if value == "true" || value == "false" { + // If the value is a boolean, we know it's a boolean + ad.Set(name, value == "true") + } else { + // Otherwise, we assume it's a string + ad.Set(name, value) + } + } + return ad, nil +} + +// Split the classad by attribute, at the first semi-colon not in quotes +func attributeShadowSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + // Look for the next newline character in the input data + if i := strings.Index(string(data), "\n"); i >= 0 { + // Found a newline character, return the split point + return i + 1, data[0:i], nil + } + + // If at end of file and no newline character is found, return the entire remaining data + if atEOF { + return len(data), data, nil + } + + // Need more data to find a newline character + return 0, nil, nil +} diff --git a/cmd/plugin.go b/cmd/plugin.go index 53106e9f1..07373533b 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -37,6 +37,7 @@ import ( "github.com/pelicanplatform/pelican/config" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/writer" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -116,6 +117,12 @@ func stashPluginMain(args []string) { configErr = errors.Wrap(configErr, "Problem initializing the Pelican Client configuration") isConfigErr = true } + loggingErr := setPluginLogging() + if loggingErr != nil { + // Note: not exiting here because logs should have default output of os.Stderr which is what we mainly want anyway. + // Therefore, just warn the user and continue + log.Warningf("Problem setting logging for the pelican plugin: %v", loggingErr) + } // Parse command line arguments var upload bool = false @@ -580,6 +587,30 @@ func writeOutfile(err error, resultAds []*classads.ClassAd, outputFile *os.File) return success, retryable } +func setPluginLogging() (err error) { + file, err := os.OpenFile(".PelicanPlugin.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + err = errors.New("Failed to open log file for writing.") + } else { + // Add file output to log + log.AddHook(&writer.Hook{ + Writer: file, + LogLevels: []log.Level{ + log.DebugLevel, + log.InfoLevel, + log.WarnLevel, + log.ErrorLevel, + log.FatalLevel, + log.PanicLevel, + }, + }) + } + // Default is stderr, but want to ensure it remains for the plugin + log.SetOutput(os.Stderr) + + return err +} + // readMultiTransfers reads the transfers from a Reader, such as stdin func readMultiTransfers(stdin bufio.Reader) (transfers []PluginTransfer, err error) { // Check stdin for a list of transfers diff --git a/cmd/plugin_stage.go b/cmd/plugin_stage.go index 3484b555d..ffde7f4e9 100644 --- a/cmd/plugin_stage.go +++ b/cmd/plugin_stage.go @@ -138,7 +138,7 @@ func stagePluginMain(cmd *cobra.Command, args []string) { log.Errorln("Failed to read ClassAd from stdin:", err) os.Exit(1) } - classad, err := classads.ParseClassAd(string(buffer[:bytesread])) + classad, err := classads.ParseShadowClassAd(string(buffer[:bytesread])) if err != nil { log.Errorln("Failed to parse ClassAd from stdin: ", err) os.Exit(1) @@ -146,6 +146,7 @@ func stagePluginMain(cmd *cobra.Command, args []string) { inputList, err := classad.Get("TransferInput") if err != nil || inputList == nil { // No TransferInput, no need to transform... + log.Debugln("No transfer input found in classad, no need to transform.") os.Exit(0) } inputListStr, ok := inputList.(string) From 9e560dd1b01b12bbeeb0626c6b635920811850cd Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Mon, 11 Mar 2024 12:24:09 -0500 Subject: [PATCH 2/5] Refactor plugin staging to make it more testable Separates out a lot of pieces of the main function into smaller functions. This helps with readability as well as for future unit testing. --- cmd/plugin_stage.go | 203 ++++++++++++++++++++++++++------------------ 1 file changed, 119 insertions(+), 84 deletions(-) diff --git a/cmd/plugin_stage.go b/cmd/plugin_stage.go index ffde7f4e9..df5167696 100644 --- a/cmd/plugin_stage.go +++ b/cmd/plugin_stage.go @@ -21,6 +21,7 @@ package main import ( "context" "fmt" + "io" "net/url" "os" "path" @@ -89,34 +90,18 @@ func stagePluginMain(cmd *cobra.Command, args []string) { ctx := cmd.Context() originPrefixStr := param.StagePlugin_OriginPrefix.GetString() - if len(originPrefixStr) == 0 { - log.Errorln("Origin prefix not specified; must be a URL (osdf://...)") - os.Exit(1) - } - originPrefixUri, err := url.Parse(originPrefixStr) + mountPrefixStr := param.StagePlugin_MountPrefix.GetString() + shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString() + + originPrefixUri, err := validatePrefixes(originPrefixStr, mountPrefixStr, shadowOriginPrefixStr) if err != nil { - log.Errorln("Origin prefix must be a URL (osdf://...):", err) - os.Exit(1) - } - if originPrefixUri.Scheme != "osdf" { - log.Errorln("Origin prefix scheme must be osdf://:", originPrefixUri.Scheme) + log.Errorln("Problem validating provided prefixes:", err) os.Exit(1) } + originPrefixPath := path.Clean("/" + originPrefixUri.Host + "/" + originPrefixUri.Path) log.Debugln("Local origin prefix:", originPrefixPath) - mountPrefixStr := param.StagePlugin_MountPrefix.GetString() - if len(mountPrefixStr) == 0 { - log.Errorln("Mount prefix is required; must be a local path (/mnt/foo/...)") - os.Exit(1) - } - - shadowOriginPrefixStr := param.StagePlugin_ShadowOriginPrefix.GetString() - if len(shadowOriginPrefixStr) == 0 { - log.Errorln("Shadow origin prefix is required; must be a URL (osdf://....)") - os.Exit(1) - } - tokenLocation := param.Plugin_Token.GetString() pb := newProgressBar() @@ -128,52 +113,12 @@ func stagePluginMain(cmd *cobra.Command, args []string) { pb.launchDisplay(ctx) } - var sources []string - var extraSources []string isHook := param.StagePlugin_Hook.GetBool() - if isHook { - buffer := make([]byte, 100*1024) - bytesread, err := os.Stdin.Read(buffer) - if err != nil { - log.Errorln("Failed to read ClassAd from stdin:", err) - os.Exit(1) - } - classad, err := classads.ParseShadowClassAd(string(buffer[:bytesread])) - if err != nil { - log.Errorln("Failed to parse ClassAd from stdin: ", err) - os.Exit(1) - } - inputList, err := classad.Get("TransferInput") - if err != nil || inputList == nil { - // No TransferInput, no need to transform... - log.Debugln("No transfer input found in classad, no need to transform.") - os.Exit(0) - } - inputListStr, ok := inputList.(string) - if !ok { - log.Errorln("TransferInput is not a string") - os.Exit(1) - } - re := regexp.MustCompile(`[,\s]+`) - for _, source := range re.Split(inputListStr, -1) { - log.Debugln("Examining transfer input file", source) - if strings.HasPrefix(source, mountPrefixStr) { - sources = append(sources, source) - } else { - // Replace the osdf:// prefix with the local mount path - source_uri, err := url.Parse(source) - source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0] - if err == nil && source_uri_scheme == "osdf" { - source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path) - if strings.HasPrefix(source_path, originPrefixPath) { - sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):]) - continue - } - } - extraSources = append(extraSources, source) - } - } - } else { + var sources, extraSources []string + var exitCode int + + // If not a condor hook, our souces come from our args + if !isHook { log.Debugln("Len of source:", len(args)) if len(args) < 1 { log.Errorln("No ingest sources") @@ -183,11 +128,39 @@ func stagePluginMain(cmd *cobra.Command, args []string) { os.Exit(1) } sources = args + log.Debugln("Sources:", sources) + } else { // Otherwise, parse the classad for our sources + // We pass in stdin here because that is how we get the classad + sources, extraSources, err, exitCode = processTransferInput(os.Stdin, mountPrefixStr, originPrefixPath) + if err != nil { + log.Errorln("Failure to get sources from job's classad:", err) + os.Exit(exitCode) + } } - log.Debugln("Sources:", sources) var result error var xformSources []string + + xformSources, result = doPluginStaging(sources, extraSources, mountPrefixStr, shadowOriginPrefixStr, tokenLocation) + // Exit with failure + if result != nil { + // Print the list of errors + log.Errorln("Failure in staging files:", result) + if client.ShouldRetry(result) { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + // If we are a condor hook, we need to print the classad change out. Condor will notice it and handle the rest + if isHook { + printOutput(xformSources, extraSources) + } +} + +// This function performs the actual "staging" on the specified shadow origin +func doPluginStaging(sources []string, extraSources []string, mountPrefixStr, shadowOriginPrefixStr, tokenLocation string) (xformSources []string, result error) { + for _, src := range sources { newSource := "" _, newSource, result = client.DoShadowIngest(context.Background(), src, mountPrefixStr, shadowOriginPrefixStr, client.WithTokenLocation(tokenLocation), client.WithAcquireToken(false)) @@ -204,23 +177,85 @@ func stagePluginMain(cmd *cobra.Command, args []string) { xformSources = append(xformSources, newSource) } - // Exit with failure - if result != nil { - // Print the list of errors - log.Errorln("Failure in staging files:", result) - if client.ShouldRetry(result) { - log.Errorln("Errors are retryable") - os.Exit(11) - } - os.Exit(1) + return xformSources, result +} + +// This function is used to print our changes out in the case we are a condor hook +func printOutput(xformSources []string, extraSources []string) { + inputsStr := strings.Join(extraSources, ", ") + if len(extraSources) > 0 && len(xformSources) > 0 { + inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ") + } else if len(xformSources) > 0 { + inputsStr = strings.Join(xformSources, ", ") } - if isHook { - inputsStr := strings.Join(extraSources, ", ") - if len(extraSources) > 0 && len(xformSources) > 0 { - inputsStr = inputsStr + ", " + strings.Join(xformSources, ", ") - } else if len(xformSources) > 0 { - inputsStr = strings.Join(xformSources, ", ") + fmt.Printf("TransferInput = \"%s\"", inputsStr) +} + +// This function is utilized to validate the arguments passed in to ensure they exist and are in the correct format +func validatePrefixes(originPrefixStr string, mountPrefixStr string, shadowOriginPrefixStr string) (originPrefixUri *url.URL, err error) { + if len(originPrefixStr) == 0 { + return nil, fmt.Errorf("Origin prefix not specified; must be a URL (osdf://...)") + } + + originPrefixUri, err = url.Parse(originPrefixStr) + if err != nil { + return nil, fmt.Errorf("Origin prefix must be a URL (osdf://...): %v", err) + } + if originPrefixUri.Scheme != "osdf" { + return nil, fmt.Errorf("Origin prefix scheme must be osdf://: %s", originPrefixUri.Scheme) + } + + if len(mountPrefixStr) == 0 { + return nil, fmt.Errorf("Mount prefix is required; must be a local path (/mnt/foo/...)") + } + if len(shadowOriginPrefixStr) == 0 { + return nil, fmt.Errorf("Shadow origin prefix is required; must be a URL (osdf://....)") + } + + return originPrefixUri, nil +} + +// This function is used when we are using a condor hook and need to get our sources from the "TransferInput" classad +// We return our sources, any extra sources, an err, and the exit code (since we have a case to exit 0) +// Note: we pass in a reader for testability but the main function will always pass stdin to get the classad +func processTransferInput(reader io.Reader, mountPrefixStr string, originPrefixPath string) (sources []string, extraSources []string, err error, exitCode int) { + buffer := make([]byte, 100*1024) + bytesread, err := reader.Read(buffer) + if err != nil { + return nil, nil, fmt.Errorf("Failed to read ClassAd from stdin: %v", err), 1 + } + classad, err := classads.ParseShadowClassAd(string(buffer[:bytesread])) + if err != nil { + return nil, nil, fmt.Errorf("Failed to parse ClassAd from stdin: %v", err), 1 + } + inputList, err := classad.Get("TransferInput") + if err != nil || inputList == nil { + // No TransferInput, no need to transform therefore we exit(0) + return nil, nil, fmt.Errorf("No transfer input found in classad, no need to transform."), 0 + } + inputListStr, ok := inputList.(string) + if !ok { + return nil, nil, fmt.Errorf("TransferInput is not a string"), 1 + } + re := regexp.MustCompile(`[,\s]+`) + for _, source := range re.Split(inputListStr, -1) { + log.Debugln("Examining transfer input file", source) + if strings.HasPrefix(source, mountPrefixStr) { + sources = append(sources, source) + } else { + // Replace the osdf:// prefix with the local mount path + source_uri, err := url.Parse(source) + source_uri_scheme := strings.SplitN(source_uri.Scheme, "+", 2)[0] + if err == nil && source_uri_scheme == "osdf" { + source_path := path.Clean("/" + source_uri.Host + "/" + source_uri.Path) + if strings.HasPrefix(source_path, originPrefixPath) { + sources = append(sources, mountPrefixStr+source_path[len(originPrefixPath):]) + continue + } + } + extraSources = append(extraSources, source) } - fmt.Printf("TransferInput = \"%s\"", inputsStr) } + log.Debugln("Sources:", sources) + return sources, extraSources, nil, 0 } From 75352ead99b5e59859d0497aef4d7c659f460baf Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Tue, 19 Mar 2024 15:34:43 -0500 Subject: [PATCH 3/5] Remove fmt.Print in config This causes issues with plugin staging as condor will assume this is a classad being printed out when it in a shadow hook with the -l flag. This needs to be removed or add some sort of check if this is used with staging or the plugin --- config/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/config/config.go b/config/config.go index cf65cb6d6..3905d84f4 100644 --- a/config/config.go +++ b/config/config.go @@ -723,7 +723,6 @@ func InitConfig() { log.Errorf("Failed to access specified log file. Error: %v", err) os.Exit(1) } - fmt.Printf("Logging.LogLocation is set to %s. All logs are redirected to the log file.\n", logLocation) log.SetOutput(f) } From 111721876ac36c0f6ebb141d8239668d342a6894 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Fri, 29 Mar 2024 10:52:14 -0500 Subject: [PATCH 4/5] Remove making logging output file for plugin Just annoying for testing and not really needed, at least for now --- cmd/plugin.go | 33 +++------------------------------ 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/cmd/plugin.go b/cmd/plugin.go index 07373533b..a51ec5c32 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -37,7 +37,6 @@ import ( "github.com/pelicanplatform/pelican/config" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/writer" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -117,12 +116,9 @@ func stashPluginMain(args []string) { configErr = errors.Wrap(configErr, "Problem initializing the Pelican Client configuration") isConfigErr = true } - loggingErr := setPluginLogging() - if loggingErr != nil { - // Note: not exiting here because logs should have default output of os.Stderr which is what we mainly want anyway. - // Therefore, just warn the user and continue - log.Warningf("Problem setting logging for the pelican plugin: %v", loggingErr) - } + + // Want to try to force logging to stderr because that is how we can see logging in condor starter log + log.SetOutput(os.Stderr) // Parse command line arguments var upload bool = false @@ -587,29 +583,6 @@ func writeOutfile(err error, resultAds []*classads.ClassAd, outputFile *os.File) return success, retryable } -func setPluginLogging() (err error) { - file, err := os.OpenFile(".PelicanPlugin.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - err = errors.New("Failed to open log file for writing.") - } else { - // Add file output to log - log.AddHook(&writer.Hook{ - Writer: file, - LogLevels: []log.Level{ - log.DebugLevel, - log.InfoLevel, - log.WarnLevel, - log.ErrorLevel, - log.FatalLevel, - log.PanicLevel, - }, - }) - } - // Default is stderr, but want to ensure it remains for the plugin - log.SetOutput(os.Stderr) - - return err -} // readMultiTransfers reads the transfers from a Reader, such as stdin func readMultiTransfers(stdin bufio.Reader) (transfers []PluginTransfer, err error) { From 9bbadec1b74939b3e7f2e9b0f0fda6f20100b288 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Fri, 29 Mar 2024 10:59:35 -0500 Subject: [PATCH 5/5] fix linter --- cmd/plugin.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/plugin.go b/cmd/plugin.go index a51ec5c32..e42f4c878 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -583,7 +583,6 @@ func writeOutfile(err error, resultAds []*classads.ClassAd, outputFile *os.File) return success, retryable } - // readMultiTransfers reads the transfers from a Reader, such as stdin func readMultiTransfers(stdin bufio.Reader) (transfers []PluginTransfer, err error) { // Check stdin for a list of transfers