Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
Moved CheckQueryParam to utils instead of in two different packages as
well as moved the test for it. Added more test cases for directread
functionality.
  • Loading branch information
joereuss12 committed May 2, 2024
1 parent f873552 commit a58d404
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 215 deletions.
141 changes: 86 additions & 55 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ var (

//go:embed resources/one-pub-one-auth.yml
mixedAuthOriginCfg string

//go:embed resources/pub-export-no-directread.yml
pubExportNoDirectRead string

//go:embed resources/pub-export-no-directread.yml
pubOriginNoDirectRead string
)

// A test that spins up a federation, and tests object get and put
Expand Down Expand Up @@ -506,77 +512,102 @@ func TestStatHttp(t *testing.T) {
})
}

// Test the functionality of the direct reads feature (?DirectRead=true)
// Test the functionality of the direct reads feature (?directread)
func TestDirectReads(t *testing.T) {
defer viper.Reset()
t.Run("testDirectReadsSuccess", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
viper.Set("Origin.EnableDirectReads", true)
fed := fed_test_utils.NewFedTest(t, bothPublicOriginCfg)
for _, export := range fed.Exports {
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()
export := fed.Exports[0]
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)
viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
transferResults, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
assert.NoError(t, err)
if err == nil {
assert.Equal(t, transferResults[0].TransferredBytes, int64(17))
}
// Assert that the file was not cached
cacheDataLocation := param.Cache_DataLocation.GetString() + export.FederationPrefix
filepath := filepath.Join(cacheDataLocation, filepath.Base(tempFile.Name()))
_, err = os.Stat(filepath)
assert.True(t, os.IsNotExist(err))
// Download the file with GET. Shouldn't need a token to succeed
transferResults, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
assert.NoError(t, err)
if err == nil {
assert.Equal(t, transferResults[0].TransferredBytes, int64(17))
}
// Assert that the file was not cached
cacheDataLocation := param.Cache_DataLocation.GetString() + export.FederationPrefix
filepath := filepath.Join(cacheDataLocation, filepath.Base(tempFile.Name()))
_, err = os.Stat(filepath)
assert.True(t, os.IsNotExist(err))
})

// Test that direct reads fail if DirectReads=false is set for origin config
t.Run("testDirectReadsDirectReadFalse", func(t *testing.T) {
// Test that direct reads fail if DirectReads=false is set for origin config but true for namespace/export
t.Run("testDirectReadsDirectReadFalseByOrigin", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
viper.Set("Origin.StorageType", "posix")
viper.Set("Origin.ExportVolumes", "/test")
viper.Set("Origin.EnablePublicReads", true)
viper.Set("Origin.EnableDirectReads", false)
fed := fed_test_utils.NewFedTest(t, "")
for _, export := range fed.Exports {
export.Capabilities.DirectReads = false
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()
fed := fed_test_utils.NewFedTest(t, pubOriginNoDirectRead)
export := fed.Exports[0]
export.Capabilities.DirectReads = true
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)
viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
_, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
assert.Error(t, err)
assert.Contains(t, err.Error(), "No origins on specified endpoint have direct reads enabled")
}
// Download the file with GET. Shouldn't need a token to succeed
_, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
assert.Error(t, err)
assert.Contains(t, err.Error(), "No origins on specified endpoint have direct reads enabled")
})

// Test that direct reads fail if DirectReads=false is set for namespace/export config but true for origin
t.Run("testDirectReadsDirectReadFalseByNamespace", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
fed := fed_test_utils.NewFedTest(t, pubExportNoDirectRead)
export := fed.Exports[0]
export.Capabilities.DirectReads = false
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
_, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
assert.Error(t, err)
assert.Contains(t, err.Error(), "No origins on specified endpoint have direct reads enabled")
})
}
23 changes: 5 additions & 18 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/namespaces"
"github.com/pelicanplatform/pelican/utils"
)

// Number of caches to attempt to use in any invocation
Expand Down Expand Up @@ -364,20 +365,6 @@ func discoverHTCondorToken(tokenName string) string {
return tokenLocation
}

// This function checks if we have a valid query (or no query) for the transfer URL
func checkValidQuery(transferUrl *url.URL) (err error) {
query := transferUrl.Query()
_, hasDirectRead := query["directread"]
_, hasPack := query["pack"]

// If we have no query, or we have recursive or pack, we are good
if len(query) == 0 || hasDirectRead || hasPack {
return nil
}

return fmt.Errorf("Invalid query parameters provided in url: %s", transferUrl)
}

// Retrieve federation namespace information for a given URL.
// If OSDFDirectorUrl is non-empty, then the namespace information will be pulled from the director;
// otherwise, it is pulled from topology.
Expand Down Expand Up @@ -464,7 +451,7 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re
}

// Check if we have a query and that it is understood
err = checkValidQuery(remoteDestUrl)
err = utils.CheckValidQuery(remoteDestUrl, false)
if err != nil {
return
}
Expand Down Expand Up @@ -535,7 +522,7 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re
}

// Check if we have a query and that it is understood
err = checkValidQuery(remoteObjectUrl)
err = utils.CheckValidQuery(remoteObjectUrl, false)
if err != nil {
return
}
Expand Down Expand Up @@ -661,7 +648,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
return nil, err
}
// Check if we have a query and that it is understood
err = checkValidQuery(sourceURL)
err = utils.CheckValidQuery(sourceURL, false)
if err != nil {
return
}
Expand All @@ -675,7 +662,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
}

// Check if we have a query and that it is understood
err = checkValidQuery(destURL)
err = utils.CheckValidQuery(destURL, false)
if err != nil {
return
}
Expand Down
41 changes: 0 additions & 41 deletions client/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,44 +356,3 @@ func TestSchemeUnderstood(t *testing.T) {
assert.Error(t, err)
})
}

// This test checks if query parameters in the url are correct
// We want invalid or > 1 query to cause an error
func TestQuery(t *testing.T) {
t.Run("TestValidDirectRead", func(t *testing.T) {
transferStr := "pelican://something/here?directread"
transferUrl, err := url.Parse(transferStr)
assert.NoError(t, err)

err = checkValidQuery(transferUrl)
assert.NoError(t, err)
})

t.Run("TestValidPack", func(t *testing.T) {
transferStr := "pelican://something/here?pack=tar.gz"
transferUrl, err := url.Parse(transferStr)
assert.NoError(t, err)

err = checkValidQuery(transferUrl)
assert.NoError(t, err)
})

t.Run("TestInvalidQuery", func(t *testing.T) {
transferStr := "pelican://something/here?durectreeds"
transferUrl, err := url.Parse(transferStr)
assert.NoError(t, err)

err = checkValidQuery(transferUrl)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Invalid query parameters provided in url: pelican://something/here?durectreeds")
})

t.Run("TestBothQueryCheckSuccess", func(t *testing.T) {
transferStr := "pelican://something/here?pack=tar.gz&directread"
transferUrl, err := url.Parse(transferStr)
assert.NoError(t, err)

err = checkValidQuery(transferUrl)
assert.NoError(t, err)
})
}
12 changes: 12 additions & 0 deletions client/resources/pub-export-no-directread.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Origin export configuration to test full multi-export capabilities

Origin:
# Things that configure the origin itself
StorageType: "posix"
EnableDirectReads: true
# The actual namespaces we export
Exports:
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
FederationPrefix: /first/namespace
# Don't set Reads -- it should be toggled true by setting PublicReads
Capabilities: ["PublicReads", "Writes"]
12 changes: 12 additions & 0 deletions client/resources/pub-origin-no-directread.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Origin export configuration to test full multi-export capabilities

Origin:
# Things that configure the origin itself
StorageType: "posix"
EnableDirectReads: false
# The actual namespaces we export
Exports:
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
FederationPrefix: /first/namespace
# Don't set Reads -- it should be toggled true by setting PublicReads
Capabilities: ["PublicReads", "Writes", "DirectReads"]
23 changes: 2 additions & 21 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pelicanplatform/pelican/classads"
"github.com/pelicanplatform/pelican/client"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -420,7 +421,7 @@ func runPluginWorker(ctx context.Context, upload bool, workChan <-chan PluginTra
break
}
// Check we have valid query parameters
err := checkValidQuery(transfer.url)
err := utils.CheckValidQuery(transfer.url, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -539,26 +540,6 @@ func parseDestination(transfer PluginTransfer) (parsedDest string) {
return transfer.localFile
}

// This function checks if we have a valid query (or no query) for the transfer URL
func checkValidQuery(transferUrl *url.URL) (err error) {
query := transferUrl.Query()
_, hasRecursive := query["recursive"]
_, hasPack := query["pack"]
_, hasDirectRead := query["directread"]

// If we have both recursive and pack, we should return a failure
if hasRecursive && hasPack {
return fmt.Errorf("Cannot have both recursive and pack query parameters")
}

// If we have no query, or we have recursive or pack, we are good
if len(query) == 0 || hasRecursive || hasPack || hasDirectRead {
return nil
}

return fmt.Errorf("Invalid query parameters provided in url: %s", transferUrl)
}

// WriteOutfile takes in the result ads from the job and the file to be outputted, it returns a boolean indicating:
// true: all result ads indicate transfer success
// false: at least one result ad has failed
Expand Down
Loading

0 comments on commit a58d404

Please sign in to comment.