Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1166 from joereuss12/cache-bypass-…
Browse files Browse the repository at this point in the history
…branch

Implement client direct reads
  • Loading branch information
jhiemstrawisc authored May 3, 2024
2 parents f317b5a + 2763403 commit 1827884
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 71 deletions.
6 changes: 3 additions & 3 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func CreateNsFromDirectorResp(dirResp *http.Response) (namespace namespaces.Name

// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(ctx context.Context, verb, source, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + source
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + sourcePath
// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
// dance!)
Expand Down Expand Up @@ -162,7 +162,7 @@ func queryDirector(ctx context.Context, verb, source, directorUrl string) (resp
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return nil, errors.Wrap(unmarshalErr, "Could not unmarshall the director's response")
}
return resp, errors.Errorf("The director reported an error: %s", respErr.Error)
return resp, errors.New(respErr.Error)
}

return
Expand Down
109 changes: 109 additions & 0 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ var (

//go:embed resources/one-pub-one-auth.yml
mixedAuthOriginCfg string

//go:embed resources/pub-export-no-directread.yml
pubExportNoDirectRead string

//go:embed resources/pub-origin-no-directread.yml
pubOriginNoDirectRead string
)

// A test that spins up a federation, and tests object get and put
Expand Down Expand Up @@ -505,3 +511,106 @@ func TestStatHttp(t *testing.T) {
assert.Contains(t, err.Error(), "Do not understand the destination scheme: some. Permitted values are file, osdf, pelican, stash, ")
})
}

// Test the functionality of the direct reads feature (?directread)
func TestDirectReads(t *testing.T) {
defer viper.Reset()
t.Run("testDirectReadsSuccess", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
viper.Set("Origin.EnableDirectReads", true)
fed := fed_test_utils.NewFedTest(t, bothPublicOriginCfg)
export := fed.Exports[0]
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
transferResults, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
require.NoError(t, err)
assert.Equal(t, transferResults[0].TransferredBytes, int64(17))

// Assert that the file was not cached
cacheDataLocation := param.Cache_DataLocation.GetString() + export.FederationPrefix
filepath := filepath.Join(cacheDataLocation, filepath.Base(tempFile.Name()))
_, err = os.Stat(filepath)
assert.True(t, os.IsNotExist(err))

// Assert our endpoint was the origin and not the cache
for _, attempt := range transferResults[0].Attempts {
assert.Equal(t, "https://"+attempt.Endpoint, param.Origin_Url.GetString())
}
})

// Test that direct reads fail if DirectReads=false is set for origin config but true for namespace/export
t.Run("testDirectReadsDirectReadFalseByOrigin", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
fed := fed_test_utils.NewFedTest(t, pubOriginNoDirectRead)
export := fed.Exports[0]
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
_, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
require.Error(t, err)
assert.Contains(t, err.Error(), "No origins on specified endpoint have direct reads enabled")
})

// Test that direct reads fail if DirectReads=false is set for namespace/export config but true for origin
t.Run("testDirectReadsDirectReadFalseByNamespace", func(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
fed := fed_test_utils.NewFedTest(t, pubExportNoDirectRead)
export := fed.Exports[0]
export.Capabilities.DirectReads = false
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

viper.Set("Logging.DisableProgressBars", true)

// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s?directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Download the file with GET. Shouldn't need a token to succeed
_, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
require.Error(t, err)
assert.Contains(t, err.Error(), "No origins on specified endpoint have direct reads enabled")
})
}
3 changes: 2 additions & 1 deletion client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1076,10 +1076,11 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL
tj.useDirector = true
tj.directorUrl = pelicanURL.directorUrl
}
ns, err := getNamespaceInfo(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload)
ns, err := getNamespaceInfo(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery)
if err != nil {
log.Errorln(err)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String())
return
}
tj.namespace = ns

Expand Down
36 changes: 33 additions & 3 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/namespaces"
"github.com/pelicanplatform/pelican/utils"
)

// Number of caches to attempt to use in any invocation
Expand Down Expand Up @@ -193,7 +194,7 @@ func DoStat(ctx context.Context, destination string, options ...TransferOption)
return 0, errors.Wrap(err, "Failed to generate pelicanURL object")
}

ns, err := getNamespaceInfo(ctx, destUri.Path, pelicanURL.directorUrl, false)
ns, err := getNamespaceInfo(ctx, destUri.Path, pelicanURL.directorUrl, false, "")
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -230,7 +231,7 @@ func GetCacheHostnames(ctx context.Context, testFile string) (urls []string, err
if err != nil {
return
}
ns, err := getNamespaceInfo(ctx, testFile, fedInfo.DirectorEndpoint, false)
ns, err := getNamespaceInfo(ctx, testFile, fedInfo.DirectorEndpoint, false, "")
if err != nil {
return
}
Expand Down Expand Up @@ -404,14 +405,17 @@ func discoverHTCondorToken(tokenName string) string {
// Retrieve federation namespace information for a given URL.
// If OSDFDirectorUrl is non-empty, then the namespace information will be pulled from the director;
// otherwise, it is pulled from topology.
func getNamespaceInfo(ctx context.Context, resourcePath, OSDFDirectorUrl string, isPut bool) (ns namespaces.Namespace, err error) {
func getNamespaceInfo(ctx context.Context, resourcePath, OSDFDirectorUrl string, isPut bool, query string) (ns namespaces.Namespace, err error) {
// If we have a director set, go through that for namespace info, otherwise use topology
if OSDFDirectorUrl != "" {
log.Debugln("Will query director at", OSDFDirectorUrl, "for object", resourcePath)
verb := "GET"
if isPut {
verb = "PUT"
}
if query != "" {
resourcePath += "?" + query
}
var dirResp *http.Response
dirResp, err = queryDirector(ctx, verb, resourcePath, OSDFDirectorUrl)
if err != nil {
Expand Down Expand Up @@ -485,6 +489,13 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re
log.Errorln("Failed to parse remote destination URL:", err)
return nil, err
}

// Check if we have a query and that it is understood
err = utils.CheckValidQuery(remoteDestUrl, false)
if err != nil {
return
}

remoteDestUrl.Scheme = remoteDestScheme

remoteDestScheme, _ = getTokenName(remoteDestUrl)
Expand Down Expand Up @@ -549,6 +560,13 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re
log.Errorln("Failed to parse source URL:", err)
return nil, err
}

// Check if we have a query and that it is understood
err = utils.CheckValidQuery(remoteObjectUrl, false)
if err != nil {
return
}

remoteObjectUrl.Scheme = remoteObjectScheme

// This is for condor cases:
Expand Down Expand Up @@ -669,6 +687,11 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
log.Errorln("Failed to parse source URL:", err)
return nil, err
}
// Check if we have a query and that it is understood
err = utils.CheckValidQuery(sourceURL, false)
if err != nil {
return
}
sourceURL.Scheme = source_scheme

destination, dest_scheme := correctURLWithUnderscore(destination)
Expand All @@ -677,6 +700,13 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
log.Errorln("Failed to parse destination URL:", err)
return nil, err
}

// Check if we have a query and that it is understood
err = utils.CheckValidQuery(destURL, false)
if err != nil {
return
}

destURL.Scheme = dest_scheme

// Check for scheme here for when using condor
Expand Down
13 changes: 13 additions & 0 deletions client/resources/pub-export-no-directread.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Origin export configuration to test direct read functionality,
# testing how we handle origin direct reads enabled but namespace direct reads disabled

Origin:
# Things that configure the origin itself
StorageType: "posix"
EnableDirectReads: true
# The actual namespaces we export
Exports:
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
FederationPrefix: /first/namespace
# Don't set Reads -- it should be toggled true by setting PublicReads
Capabilities: ["PublicReads", "Writes"]
19 changes: 19 additions & 0 deletions client/resources/pub-origin-no-directread.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Origin export configuration to test direct read functionality,
# testing how we handle origin direct reads disabled but namespace direct reads enabled

Origin:
# Things that configure the origin itself
StorageType: "posix"
EnableDirectReads: false
# The actual namespaces we export
Exports:
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
FederationPrefix: /first/namespace
# Don't set Reads -- it should be toggled true by setting PublicReads
Capabilities: ["PublicReads", "Writes", "DirectReads"]
# Origins assume the capabilities of the namespace when there is only 1 for that origin,
# the second namespace is to override that behavior
- StoragePrefix: /<SHOULD BE OVERRIDDEN>
FederationPrefix: /second/namespace
# Don't set Reads -- it should be toggled true by setting PublicReads
Capabilities: ["PublicReads", "Writes", "DirectReads"]
21 changes: 1 addition & 20 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func runPluginWorker(ctx context.Context, upload bool, workChan <-chan PluginTra
break
}
// Check we have valid query parameters
err := checkValidQuery(transfer.url)
err := utils.CheckValidQuery(transfer.url, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -537,25 +537,6 @@ func parseDestination(transfer PluginTransfer) (parsedDest string) {
return transfer.localFile
}

// This function checks if we have a valid query (or no query) for the transfer URL
func checkValidQuery(transferUrl *url.URL) (err error) {
query := transferUrl.Query()
_, hasRecursive := query["recursive"]
_, hasPack := query["pack"]

// If we have both recursive and pack, we should return a failure
if hasRecursive && hasPack {
return fmt.Errorf("Cannot have both recursive and pack query parameters")
}

// If we have no query, or we have recursive or pack, we are good
if len(query) == 0 || hasRecursive || hasPack {
return nil
}

return fmt.Errorf("Invalid query parameters procided in url: %s", transferUrl)
}

// WriteOutfile takes in the result ads from the job and the file to be outputted, it returns a boolean indicating:
// true: all result ads indicate transfer success
// false: at least one result ad has failed
Expand Down
Loading

0 comments on commit 1827884

Please sign in to comment.