Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Director advertise origin instead of writebackhost #622

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -848,16 +849,40 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
nonZeroSize = fileInfo.Size() > 0
}

// Parse the writeback host as a URL
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
// call a GET on the director, director will respond with our endpoint
directorUrlStr := param.Federation_DirectorUrl.GetString()
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return 0, err
return 0, errors.Wrap(err, "failed to parse director url")
}
directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", origDest.Path)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director path for upload")
}

payload := []byte("forPUT")
req, err := http.NewRequest("GET", directorUrl.String(), bytes.NewBuffer(payload))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this calling GET in order to do a PUT? Shouldn't it just invoke a PUT and rely on the director to redirect?

What's the implications on backward compatibility here? Doesn't this break OSDF integration because it doesn't use the topology information when that's the preferred mechanism?

To avoid backward compat breaks, we should do something similar to what is done with downloads:
0. Based on configuration, perhaps take the topology code branches.

  1. If using the director, invoke a PUT to the director when doing the namespace resolution.
  2. Instead of following the redirect, look at the headers / redirect location header to determine the "real" location of the destination.
  3. Based on (2), fill in the namespace structure accordingly.
  4. Then, the upload code proceeds without knowing where the namespace information comes from.

For examples from the analogous GET case, see https://github.com/PelicanPlatform/pelican/blob/v7.3.3/client/director.go#L66

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a put? It's not doing an update to the director, is it? It's getting information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbockelm Just want to make sure I am approaching this correctly:
So should I move this logic to around where we query the director/CreateNsFromDirectorResp within client/main.go and populate writebackhost with the correct value in cases we do not use topology. Then change uploadfile to get the destination from writebackhost in the namespace struct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One tweak:

Then change uploadfile to get the destination from writebackhost in the namespace struct?

I'd say change uploadfile back to this behavior. Basically, revert the changes in that method back to where it was before.

Copy link
Collaborator

@bbockelm bbockelm Jan 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, another clarification:

move this logic to around where we query the director

The logic should also change slightly too. Instead of a GET with body contents, it should be a PUT.

if err != nil {
return 0, errors.Wrap(err, "failed to construct request for director-origin query")
}

dest := &url.URL{
Host: writebackhostUrl.Host,
Scheme: "https",
Path: origDest.Path,
client := &http.Client{
Transport: config.GetTransport(),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return 0, errors.Wrap(err, "failed to send request to director to obtain upload endpoint")
}
if resp.StatusCode == 405 {
return 0, errors.New("Error 405: No writeable origins were found")
}
defer resp.Body.Close()
dest, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, errors.Wrap(err, "failed to parse location header from director response")
}

// Create the wrapped reader and send it to the request
Expand Down Expand Up @@ -972,10 +997,9 @@ Loop:

}

var UploadClient = &http.Client{Transport: config.GetTransport()}

// Actually perform the Put request to the server
func doPut(request *http.Request, responseChan chan<- *http.Response, errorChan chan<- error) {
var UploadClient = &http.Client{Transport: config.GetTransport()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the impact of this? Shouldn't clients be shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this within the function because when it was a global variable, the transport was not being set correctly. For example, it did not recognize TLSSkipVerify being set to true and it was always false. Somehow it was getting the transport before any init functions in config were called.

client := UploadClient
dump, _ := httputil.DumpRequestOut(request, false)
log.Debugf("Dumping request: %s", dump)
Expand Down
222 changes: 184 additions & 38 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build !windows

/***************************************************************
*
* Copyright (C) 2023, University of Nebraska-Lincoln
Expand All @@ -20,6 +22,11 @@ package client

import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -31,11 +38,21 @@ import (
"testing"
"time"

"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/lestrrat-go/jwx/v2/jwt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/launchers"
"github.com/pelicanplatform/pelican/namespaces"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/test_utils"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -366,47 +383,176 @@ func TestFailedUpload(t *testing.T) {
}
}

func generateFileTestScitoken() (string, error) {
// Issuer is whichever server that initiates the test, so it's the server itself
issuerUrl := param.Origin_Url.GetString()
if issuerUrl == "" { // if both are empty, then error
return "", errors.New("Failed to create token: Invalid iss, Server_ExternalWebUrl is empty")
}
jti_bytes := make([]byte, 16)
if _, err := rand.Read(jti_bytes); err != nil {
return "", err
}
jti := base64.RawURLEncoding.EncodeToString(jti_bytes)

tok, err := jwt.NewBuilder().
Claim("scope", "storage.read:/ storage.modify:/").
Claim("wlcg.ver", "1.0").
JwtID(jti).
Issuer(issuerUrl).
Audience([]string{"https://wlcg.cern.ch/jwt/v1/any"}).
Subject("origin").
Expiration(time.Now().Add(time.Minute)).
IssuedAt(time.Now()).
Build()
if err != nil {
return "", err
}

key, err := config.GetIssuerPrivateJWK()
if err != nil {
return "", errors.Wrap(err, "Failed to load server's issuer key")
}

if err := jwk.AssignKeyID(key); err != nil {
return "", errors.Wrap(err, "Failed to assign kid to the token")
}

signed, err := jwt.Sign(tok, jwt.WithKey(jwa.ES256, key))
if err != nil {
return "", err
}

return string(signed), nil
}

func TestFullUpload(t *testing.T) {
testFileContent := "test file content"
ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Setup our test federation
ctx, cancel, egrp := test_utils.TestContext(context.Background(), t)
defer func() { require.NoError(t, egrp.Wait()) }()
defer cancel()

//t.Logf("%s", dump)
assert.Equal(t, "PUT", r.Method, "Not PUT Method")
_, err := w.Write([]byte(":)"))
assert.NoError(t, err)
}))
defer ts.Close()
viper.Reset()

// Create the temporary file to upload
tempFile, err := os.CreateTemp(t.TempDir(), "test")
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

// Create the namespace (only the write back host is read)
testURL, err := url.Parse(ts.URL)
assert.NoError(t, err, "Error parsing test URL")
testNamespace := namespaces.Namespace{
WriteBackHost: "https://" + testURL.Host,
modules := config.ServerType(0)
modules.Set(config.OriginType)
modules.Set(config.DirectorType)
modules.Set(config.RegistryType)

// Create our own temp directory (for some reason t.TempDir() does not play well with xrootd)
tmpPathPattern := "XRootD-Test_Origin*"
tmpPath, err := os.MkdirTemp("", tmpPathPattern)
require.NoError(t, err)

// Need to set permissions or the xrootd process we spawn won't be able to write PID/UID files
permissions := os.FileMode(0755)
err = os.Chmod(tmpPath, permissions)
require.NoError(t, err)

viper.Set("ConfigDir", tmpPath)

// Increase the log level; otherwise, its difficult to debug failures
// viper.Set("Logging.Level", "Debug")
config.InitConfig()

originDir, err := os.MkdirTemp("", "Origin")
assert.NoError(t, err)

// Change the permissions of the temporary directory
permissions = os.FileMode(0777)
err = os.Chmod(originDir, permissions)
require.NoError(t, err)

viper.Set("Origin.ExportVolume", originDir+":/test")
viper.Set("Origin.Mode", "posix")
// Disable functionality we're not using (and is difficult to make work on Mac)
viper.Set("Origin.EnableCmsd", false)
viper.Set("Origin.EnableMacaroons", false)
viper.Set("Origin.EnableVoms", false)
viper.Set("Origin.WriteEnabled", true)
viper.Set("TLSSkipVerify", true)
viper.Set("Server.EnableUI", false)
viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite"))

err = config.InitServer(ctx, modules)
require.NoError(t, err)

fedCancel, err := launchers.LaunchModules(ctx, modules)
defer fedCancel()
if err != nil {
log.Errorln("Failure in fedServeInternal:", err)
require.NoError(t, err)
}

// Upload the file
uploadURL, err := url.Parse("stash:///test/stuff/blah.txt")
assert.NoError(t, err, "Error parsing upload URL")
// Set the upload client to trust the server
UploadClient = ts.Client()
uploaded, err := UploadFile(tempFile.Name(), uploadURL, "Bearer test", testNamespace)
assert.NoError(t, err, "Error uploading file")
assert.Equal(t, int64(len(testFileContent)), uploaded, "Uploaded file size does not match")

// Upload an osdf file
uploadURL, err = url.Parse("osdf:///test/stuff/blah.txt")
assert.NoError(t, err, "Error parsing upload URL")
// Set the upload client to trust the server
UploadClient = ts.Client()
uploaded, err = UploadFile(tempFile.Name(), uploadURL, "Bearer test", testNamespace)
assert.NoError(t, err, "Error uploading file")
assert.Equal(t, int64(len(testFileContent)), uploaded, "Uploaded file size does not match")
desiredURL := param.Server_ExternalWebUrl.GetString() + "/.well-known/openid-configuration"
err = server_utils.WaitUntilWorking(ctx, "GET", desiredURL, "director", 200)
require.NoError(t, err)

httpc := http.Client{
Transport: config.GetTransport(),
}
resp, err := httpc.Get(desiredURL)
require.NoError(t, err)

assert.Equal(t, resp.StatusCode, http.StatusOK)

responseBody, err := io.ReadAll(resp.Body)
require.NoError(t, err)
expectedResponse := struct {
JwksUri string `json:"jwks_uri"`
}{}
err = json.Unmarshal(responseBody, &expectedResponse)
require.NoError(t, err)

assert.NotEmpty(t, expectedResponse.JwksUri)

t.Run("testFullUpload", func(t *testing.T) {
testFileContent := "test file content"

// Create the temporary file to upload
tempFile, err := os.CreateTemp(t.TempDir(), "test")
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

// Create a token file
token, err := generateFileTestScitoken()
assert.NoError(t, err)
tempToken, err := os.CreateTemp(t.TempDir(), "token")
assert.NoError(t, err, "Error creating temp token file")
defer os.Remove(tempToken.Name())
_, err = tempToken.WriteString(token)
assert.NoError(t, err, "Error writing to temp token file")
tempFile.Close()
ObjectClientOptions.Token = tempToken.Name()

// Upload the file
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := "stash:///test/" + fileName

methods := []string{"http"}
uploaded, err := DoStashCPSingle(tempFile.Name(), uploadURL, methods, false)
assert.NoError(t, err, "Error uploading file")
assert.Equal(t, int64(len(testFileContent)), uploaded, "Uploaded file size does not match")

// Upload an osdf file
uploadURL = "osdf:///test/stuff/blah.txt"
assert.NoError(t, err, "Error parsing upload URL")
uploaded, err = DoStashCPSingle(tempFile.Name(), uploadURL, methods, false)
assert.NoError(t, err, "Error uploading file")
assert.Equal(t, int64(len(testFileContent)), uploaded, "Uploaded file size does not match")
})
t.Cleanup(func() {
ObjectClientOptions.Token = ""
os.RemoveAll(tmpPath)
os.RemoveAll(originDir)
})

cancel()
fedCancel()
assert.NoError(t, egrp.Wait())
viper.Reset()
}
43 changes: 35 additions & 8 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,42 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
// For write back, it will be the destination
// For read it will be the source.

OSDFDirectorUrl := param.Federation_DirectorUrl.GetString()
useOSDFDirector := viper.IsSet("Federation.DirectorURL")
joereuss12 marked this conversation as resolved.
Show resolved Hide resolved

if destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican" {
log.Debugln("Detected writeback")
ns, err := namespaces.MatchNamespace(dest_url.Path)
if err != nil {
log.Errorln("Failed to get namespace information:", err)
AddError(err)
return 0, err
if !strings.HasPrefix(destination, "/") {
destination = strings.TrimPrefix(destination, destScheme+"://")
}
var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
joereuss12 marked this conversation as resolved.
Show resolved Hide resolved
directorOriginsUrl, err := url.Parse(OSDFDirectorUrl)
if err != nil {
return 0, err
}
directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin")
if err != nil {
return 0, err
}
dirResp, err := QueryDirector(destination, directorOriginsUrl.String())
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(dest_url.Path)
if err != nil {
AddError(err)
return 0, err
}
}
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
Expand All @@ -523,10 +552,8 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
sourceFile = "/" + sourceFile
}

OSDFDirectorUrl := param.Federation_DirectorUrl.GetString()
useOSDFDirector := viper.IsSet("Federation.DirectorURL")

var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/object_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func init() {

func copyMain(cmd *cobra.Command, args []string) {

client.ObjectClientOptions.Version = version
client.ObjectClientOptions.Version = config.PelicanVersion

// Need to check just stashcp since it does not go through root, the other modes get checked there
if strings.HasPrefix(execName, "stashcp") {
Expand Down
Loading
Loading