Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes to origin/director advertise to client #627

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func CreateNsFromDirectorResp(dirResp *http.Response) (namespace namespaces.Name
return
}

func QueryDirector(source string, directorUrl string) (resp *http.Response, err error) {
// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(verb, source, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + source
// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
Expand All @@ -138,7 +140,7 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
},
}

req, err := http.NewRequest("GET", resourceUrl, nil)
req, err := http.NewRequest(verb, resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
Expand Down Expand Up @@ -172,7 +174,7 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return nil, errors.Wrap(unmarshalErr, "Could not unmarshall the director's response")
}
return nil, errors.Errorf("The director reported an error: %s\n", respErr.Error)
return resp, errors.Errorf("The director reported an error: %s", respErr.Error)
}

return
Expand Down
5 changes: 3 additions & 2 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package client

import (
"bytes"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"

namespaces "github.com/pelicanplatform/pelican/namespaces"
)

Expand Down Expand Up @@ -194,7 +195,7 @@ func TestQueryDirector(t *testing.T) {
defer server.Close()

// Call QueryDirector with the test server URL and a source path
actualResp, err := QueryDirector("/foo/bar", server.URL)
actualResp, err := queryDirector("GET", "/foo/bar", server.URL)
if err != nil {
t.Fatal(err)
}
Expand Down
39 changes: 7 additions & 32 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -849,40 +848,16 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
nonZeroSize = fileInfo.Size() > 0
}

// call a GET on the director, director will respond with our endpoint
directorUrlStr := param.Federation_DirectorUrl.GetString()
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director url")
}
directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", origDest.Path)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director path for upload")
}

payload := []byte("forPUT")
req, err := http.NewRequest("GET", directorUrl.String(), bytes.NewBuffer(payload))
// Parse the writeback host as a URL
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
if err != nil {
return 0, errors.Wrap(err, "failed to construct request for director-origin query")
return 0, err
}

client := &http.Client{
Transport: config.GetTransport(),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return 0, errors.Wrap(err, "failed to send request to director to obtain upload endpoint")
}
if resp.StatusCode == 405 {
return 0, errors.New("Error 405: No writeable origins were found")
}
defer resp.Body.Close()
dest, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, errors.Wrap(err, "failed to parse location header from director response")
dest := &url.URL{
Host: writebackhostUrl.Host,
Scheme: "https",
Path: origDest.Path,
}

// Create the wrapped reader and send it to the request
Expand Down
11 changes: 6 additions & 5 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestFailedUpload(t *testing.T) {
func generateFileTestScitoken() (string, error) {
// Issuer is whichever server that initiates the test, so it's the server itself
issuerUrl := param.Origin_Url.GetString()
if issuerUrl == "" { // if both are empty, then error
if issuerUrl == "" { // if empty, then error
return "", errors.New("Failed to create token: Invalid iss, Server_ExternalWebUrl is empty")
}
jti_bytes := make([]byte, 16)
Expand All @@ -400,7 +400,7 @@ func generateFileTestScitoken() (string, error) {
Claim("wlcg.ver", "1.0").
JwtID(jti).
Issuer(issuerUrl).
Audience([]string{"https://wlcg.cern.ch/jwt/v1/any"}).
Audience([]string{param.Origin_Url.GetString()}).
Subject("origin").
Expiration(time.Now().Add(time.Minute)).
IssuedAt(time.Now()).
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestFullUpload(t *testing.T) {
viper.Set("ConfigDir", tmpPath)

// Increase the log level; otherwise, its difficult to debug failures
// viper.Set("Logging.Level", "Debug")
viper.Set("Logging.Level", "Debug")
config.InitConfig()

originDir, err := os.MkdirTemp("", "Origin")
Expand All @@ -469,10 +469,11 @@ func TestFullUpload(t *testing.T) {
viper.Set("Origin.EnableCmsd", false)
viper.Set("Origin.EnableMacaroons", false)
viper.Set("Origin.EnableVoms", false)
viper.Set("Origin.WriteEnabled", true)
viper.Set("Origin.EnableWrite", true)
viper.Set("TLSSkipVerify", true)
viper.Set("Server.EnableUI", false)
viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite"))
viper.Set("Xrootd.RunLocation", tmpPath)

err = config.InitServer(ctx, modules)
require.NoError(t, err)
Expand Down Expand Up @@ -525,7 +526,7 @@ func TestFullUpload(t *testing.T) {
defer os.Remove(tempToken.Name())
_, err = tempToken.WriteString(token)
assert.NoError(t, err, "Error writing to temp token file")
tempFile.Close()
tempToken.Close()
ObjectClientOptions.Token = tempToken.Name()

// Upload the file
Expand Down
118 changes: 64 additions & 54 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"net/url"
"regexp"
"runtime/debug"
Expand Down Expand Up @@ -412,6 +413,58 @@ func discoverHTCondorToken(tokenName string) string {
return tokenLocation
}

// Retrieve federation namespace information for a given URL.
// If OSDFDirectorUrl is non-empty, then the namespace information will be pulled from the director;
// otherwise, it is pulled from topology.
func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns namespaces.Namespace, err error) {
// If we have a director set, go through that for namespace info, otherwise use topology
if OSDFDirectorUrl != "" {
log.Debugln("Will query director at", OSDFDirectorUrl, "for object", resourcePath)
verb := "GET"
if isPut {
verb = "PUT"
}
var dirResp *http.Response
dirResp, err = queryDirector(verb, resourcePath, OSDFDirectorUrl)
if err != nil {
if isPut && dirResp != nil && dirResp.StatusCode == 405 {
err = errors.New("Error 405: No writeable origins were found")
AddError(err)
return
} else {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return
}
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return
}

// if we are doing a PUT, we need to get our endpoint from the director
if isPut {
var writeBackUrl *url.URL
location := dirResp.Header.Get("Location")
writeBackUrl, err = url.Parse(location)
if err != nil {
log.Errorf("The director responded with an invalid location (does not parse as URL: %v): %s", err, location)
return
}
ns.WriteBackHost = "https://" + writeBackUrl.Host
}
return
} else {
ns, err = namespaces.MatchNamespace(resourcePath)
if err != nil {
AddError(err)
return
}
return
}
}

// Start the transfer, whether read or write back
func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) {

Expand Down Expand Up @@ -499,41 +552,14 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
// For read it will be the source.

OSDFDirectorUrl := param.Federation_DirectorUrl.GetString()
useOSDFDirector := viper.IsSet("Federation.DirectorURL")
isPut := destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican"

if destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican" {
log.Debugln("Detected writeback")
if !strings.HasPrefix(destination, "/") {
destination = strings.TrimPrefix(destination, destScheme+"://")
}
var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
directorOriginsUrl, err := url.Parse(OSDFDirectorUrl)
if err != nil {
return 0, err
}
directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin")
if err != nil {
return 0, err
}
dirResp, err := QueryDirector(destination, directorOriginsUrl.String())
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(dest_url.Path)
if err != nil {
AddError(err)
return 0, err
}
if isPut {
log.Debugln("Detected object write to remote federation object", dest_url.Path)
ns, err := getNamespaceInfo(dest_url.Path, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from destination")
}
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
Expand All @@ -552,26 +578,10 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
sourceFile = "/" + sourceFile
}

var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(source_url.Path)
if err != nil {
AddError(err)
return 0, err
}
ns, err := getNamespaceInfo(sourceFile, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}

// get absolute path
Expand Down
2 changes: 1 addition & 1 deletion client/sharing_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CreateSharingUrl(objectUrl *url.URL, isWrite bool) (string, error) {
objectUrl.Path = "/" + strings.TrimPrefix(objectUrl.Path, "/")

log.Debugln("Will query director for path", objectUrl.Path)
dirResp, err := QueryDirector(objectUrl.Path, directorUrl)
dirResp, err := queryDirector("GET", objectUrl.Path, directorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
return "", errors.Wrapf(err, "Error while querying the director at %s", directorUrl)
Expand Down
2 changes: 1 addition & 1 deletion cmd/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func init() {

// The -w flag is used if we want the origin to be writeable.
originServeCmd.Flags().BoolP("writeable", "", true, "Allow/disable writting to the origin")
if err := viper.BindPFlag("Origin.WriteEnabled", originServeCmd.Flags().Lookup("writeable")); err != nil {
if err := viper.BindPFlag("Origin.EnableWrite", originServeCmd.Flags().Lookup("writeable")); err != nil {
panic(err)
}

Expand Down
1 change: 1 addition & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Origin:
EnableMacaroons: false
EnableVoms: true
EnableUI: true
EnableWrite: true
SelfTest: true
Monitoring:
PortLower: 9930
Expand Down
2 changes: 1 addition & 1 deletion director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseServerAd(server utils.Server, serverType ServerType) ServerAd {
serverAd.Type = serverType
serverAd.Name = server.Resource

serverAd.WriteEnabled = param.Origin_WriteEnabled.GetBool()
serverAd.EnableWrite = param.Origin_EnableWrite.GetBool()
// url.Parse requires that the scheme be present before the hostname,
// but endpoints do not have a scheme. As such, we need to add one for the.
// correct parsing. Luckily, we don't use this anywhere else (it's just to
Expand Down
17 changes: 9 additions & 8 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type (
}

ServerAd struct {
Name string
AuthURL url.URL
URL url.URL // This is server's XRootD URL for file transfer
WebURL url.URL // This is server's Web interface and API
Type ServerType
Latitude float64
Longitude float64
WriteEnabled bool
Name string
AuthURL url.URL
URL url.URL // This is server's XRootD URL for file transfer
WebURL url.URL // This is server's Web interface and API
Type ServerType
Latitude float64
Longitude float64
EnableWrite bool
EnableFallbackRead bool // True if reads from the origin are permitted when no cache is available
}

ServerType string
Expand Down
11 changes: 6 additions & 5 deletions director/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import (

type (
OriginAdvertise struct {
Name string `json:"name"`
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
WriteEnabled bool `json:"writeenabled"`
Name string `json:"name"`
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
EnableWrite bool `json:"enablewrite"`
EnableFallbackRead bool `json:"enable-fallback-read"` // True if the origin will allow direct client reads when no caches are available
}
)

Expand Down
Loading
Loading