Skip to content

Commit

Permalink
Merge pull request #627 from joereuss12/fixes-to-622-advertise-origin
Browse files Browse the repository at this point in the history
Fixes to origin/director advertise to client
  • Loading branch information
joereuss12 authored Jan 10, 2024
2 parents a569faa + c719bab commit ca26ba8
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 146 deletions.
8 changes: 5 additions & 3 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ func CreateNsFromDirectorResp(dirResp *http.Response) (namespace namespaces.Name
return
}

func QueryDirector(source string, directorUrl string) (resp *http.Response, err error) {
// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(verb, source, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + source
// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
Expand All @@ -138,7 +140,7 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
},
}

req, err := http.NewRequest("GET", resourceUrl, nil)
req, err := http.NewRequest(verb, resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
Expand Down Expand Up @@ -172,7 +174,7 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return nil, errors.Wrap(unmarshalErr, "Could not unmarshall the director's response")
}
return nil, errors.Errorf("The director reported an error: %s\n", respErr.Error)
return resp, errors.Errorf("The director reported an error: %s", respErr.Error)
}

return
Expand Down
5 changes: 3 additions & 2 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package client

import (
"bytes"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"

namespaces "github.com/pelicanplatform/pelican/namespaces"
)

Expand Down Expand Up @@ -194,7 +195,7 @@ func TestQueryDirector(t *testing.T) {
defer server.Close()

// Call QueryDirector with the test server URL and a source path
actualResp, err := QueryDirector("/foo/bar", server.URL)
actualResp, err := queryDirector("GET", "/foo/bar", server.URL)
if err != nil {
t.Fatal(err)
}
Expand Down
39 changes: 7 additions & 32 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package client

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -849,40 +848,16 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
nonZeroSize = fileInfo.Size() > 0
}

// call a GET on the director, director will respond with our endpoint
directorUrlStr := param.Federation_DirectorUrl.GetString()
directorUrl, err := url.Parse(directorUrlStr)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director url")
}
directorUrl.Path, err = url.JoinPath("/api/v1.0/director/origin", origDest.Path)
if err != nil {
return 0, errors.Wrap(err, "failed to parse director path for upload")
}

payload := []byte("forPUT")
req, err := http.NewRequest("GET", directorUrl.String(), bytes.NewBuffer(payload))
// Parse the writeback host as a URL
writebackhostUrl, err := url.Parse(namespace.WriteBackHost)
if err != nil {
return 0, errors.Wrap(err, "failed to construct request for director-origin query")
return 0, err
}

client := &http.Client{
Transport: config.GetTransport(),
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
if err != nil {
return 0, errors.Wrap(err, "failed to send request to director to obtain upload endpoint")
}
if resp.StatusCode == 405 {
return 0, errors.New("Error 405: No writeable origins were found")
}
defer resp.Body.Close()
dest, err := url.Parse(resp.Header.Get("Location"))
if err != nil {
return 0, errors.Wrap(err, "failed to parse location header from director response")
dest := &url.URL{
Host: writebackhostUrl.Host,
Scheme: "https",
Path: origDest.Path,
}

// Create the wrapped reader and send it to the request
Expand Down
11 changes: 6 additions & 5 deletions client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestFailedUpload(t *testing.T) {
func generateFileTestScitoken() (string, error) {
// Issuer is whichever server that initiates the test, so it's the server itself
issuerUrl := param.Origin_Url.GetString()
if issuerUrl == "" { // if both are empty, then error
if issuerUrl == "" { // if empty, then error
return "", errors.New("Failed to create token: Invalid iss, Server_ExternalWebUrl is empty")
}
jti_bytes := make([]byte, 16)
Expand All @@ -400,7 +400,7 @@ func generateFileTestScitoken() (string, error) {
Claim("wlcg.ver", "1.0").
JwtID(jti).
Issuer(issuerUrl).
Audience([]string{"https://wlcg.cern.ch/jwt/v1/any"}).
Audience([]string{param.Origin_Url.GetString()}).
Subject("origin").
Expiration(time.Now().Add(time.Minute)).
IssuedAt(time.Now()).
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestFullUpload(t *testing.T) {
viper.Set("ConfigDir", tmpPath)

// Increase the log level; otherwise, its difficult to debug failures
// viper.Set("Logging.Level", "Debug")
viper.Set("Logging.Level", "Debug")
config.InitConfig()

originDir, err := os.MkdirTemp("", "Origin")
Expand All @@ -469,10 +469,11 @@ func TestFullUpload(t *testing.T) {
viper.Set("Origin.EnableCmsd", false)
viper.Set("Origin.EnableMacaroons", false)
viper.Set("Origin.EnableVoms", false)
viper.Set("Origin.WriteEnabled", true)
viper.Set("Origin.EnableWrite", true)
viper.Set("TLSSkipVerify", true)
viper.Set("Server.EnableUI", false)
viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite"))
viper.Set("Xrootd.RunLocation", tmpPath)

err = config.InitServer(ctx, modules)
require.NoError(t, err)
Expand Down Expand Up @@ -525,7 +526,7 @@ func TestFullUpload(t *testing.T) {
defer os.Remove(tempToken.Name())
_, err = tempToken.WriteString(token)
assert.NoError(t, err, "Error writing to temp token file")
tempFile.Close()
tempToken.Close()
ObjectClientOptions.Token = tempToken.Name()

// Upload the file
Expand Down
118 changes: 64 additions & 54 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"net/url"
"regexp"
"runtime/debug"
Expand Down Expand Up @@ -412,6 +413,58 @@ func discoverHTCondorToken(tokenName string) string {
return tokenLocation
}

// Retrieve federation namespace information for a given URL.
// If OSDFDirectorUrl is non-empty, then the namespace information will be pulled from the director;
// otherwise, it is pulled from topology.
func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns namespaces.Namespace, err error) {
// If we have a director set, go through that for namespace info, otherwise use topology
if OSDFDirectorUrl != "" {
log.Debugln("Will query director at", OSDFDirectorUrl, "for object", resourcePath)
verb := "GET"
if isPut {
verb = "PUT"
}
var dirResp *http.Response
dirResp, err = queryDirector(verb, resourcePath, OSDFDirectorUrl)
if err != nil {
if isPut && dirResp != nil && dirResp.StatusCode == 405 {
err = errors.New("Error 405: No writeable origins were found")
AddError(err)
return
} else {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return
}
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return
}

// if we are doing a PUT, we need to get our endpoint from the director
if isPut {
var writeBackUrl *url.URL
location := dirResp.Header.Get("Location")
writeBackUrl, err = url.Parse(location)
if err != nil {
log.Errorf("The director responded with an invalid location (does not parse as URL: %v): %s", err, location)
return
}
ns.WriteBackHost = "https://" + writeBackUrl.Host
}
return
} else {
ns, err = namespaces.MatchNamespace(resourcePath)
if err != nil {
AddError(err)
return
}
return
}
}

// Start the transfer, whether read or write back
func DoStashCPSingle(sourceFile string, destination string, methods []string, recursive bool) (bytesTransferred int64, err error) {

Expand Down Expand Up @@ -499,41 +552,14 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
// For read it will be the source.

OSDFDirectorUrl := param.Federation_DirectorUrl.GetString()
useOSDFDirector := viper.IsSet("Federation.DirectorURL")
isPut := destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican"

if destScheme == "stash" || destScheme == "osdf" || destScheme == "pelican" {
log.Debugln("Detected writeback")
if !strings.HasPrefix(destination, "/") {
destination = strings.TrimPrefix(destination, destScheme+"://")
}
var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
directorOriginsUrl, err := url.Parse(OSDFDirectorUrl)
if err != nil {
return 0, err
}
directorOriginsUrl.Path, err = url.JoinPath("api", "v1.0", "director", "origin")
if err != nil {
return 0, err
}
dirResp, err := QueryDirector(destination, directorOriginsUrl.String())
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(dest_url.Path)
if err != nil {
AddError(err)
return 0, err
}
if isPut {
log.Debugln("Detected object write to remote federation object", dest_url.Path)
ns, err := getNamespaceInfo(dest_url.Path, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from destination")
}
uploadedBytes, err := doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
Expand All @@ -552,26 +578,10 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
sourceFile = "/" + sourceFile
}

var ns namespaces.Namespace
// If we have a director set, go through that for namespace info, otherwise use topology
if useOSDFDirector {
dirResp, err := QueryDirector(sourceFile, OSDFDirectorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
AddError(err)
return 0, err
}
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
}
} else {
ns, err = namespaces.MatchNamespace(source_url.Path)
if err != nil {
AddError(err)
return 0, err
}
ns, err := getNamespaceInfo(sourceFile, OSDFDirectorUrl, isPut)
if err != nil {
log.Errorln(err)
return 0, errors.New("Failed to get namespace information from source")
}

// get absolute path
Expand Down
2 changes: 1 addition & 1 deletion client/sharing_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func CreateSharingUrl(objectUrl *url.URL, isWrite bool) (string, error) {
objectUrl.Path = "/" + strings.TrimPrefix(objectUrl.Path, "/")

log.Debugln("Will query director for path", objectUrl.Path)
dirResp, err := QueryDirector(objectUrl.Path, directorUrl)
dirResp, err := queryDirector("GET", objectUrl.Path, directorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
return "", errors.Wrapf(err, "Error while querying the director at %s", directorUrl)
Expand Down
2 changes: 1 addition & 1 deletion cmd/origin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func init() {

// The -w flag is used if we want the origin to be writeable.
originServeCmd.Flags().BoolP("writeable", "", true, "Allow/disable writting to the origin")
if err := viper.BindPFlag("Origin.WriteEnabled", originServeCmd.Flags().Lookup("writeable")); err != nil {
if err := viper.BindPFlag("Origin.EnableWrite", originServeCmd.Flags().Lookup("writeable")); err != nil {
panic(err)
}

Expand Down
1 change: 1 addition & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Origin:
EnableMacaroons: false
EnableVoms: true
EnableUI: true
EnableWrite: true
SelfTest: true
Monitoring:
PortLower: 9930
Expand Down
2 changes: 1 addition & 1 deletion director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseServerAd(server utils.Server, serverType ServerType) ServerAd {
serverAd.Type = serverType
serverAd.Name = server.Resource

serverAd.WriteEnabled = param.Origin_WriteEnabled.GetBool()
serverAd.EnableWrite = param.Origin_EnableWrite.GetBool()
// url.Parse requires that the scheme be present before the hostname,
// but endpoints do not have a scheme. As such, we need to add one for the.
// correct parsing. Luckily, we don't use this anywhere else (it's just to
Expand Down
17 changes: 9 additions & 8 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type (
}

ServerAd struct {
Name string
AuthURL url.URL
URL url.URL // This is server's XRootD URL for file transfer
WebURL url.URL // This is server's Web interface and API
Type ServerType
Latitude float64
Longitude float64
WriteEnabled bool
Name string
AuthURL url.URL
URL url.URL // This is server's XRootD URL for file transfer
WebURL url.URL // This is server's Web interface and API
Type ServerType
Latitude float64
Longitude float64
EnableWrite bool
EnableFallbackRead bool // True if reads from the origin are permitted when no cache is available
}

ServerType string
Expand Down
11 changes: 6 additions & 5 deletions director/origin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ import (

type (
OriginAdvertise struct {
Name string `json:"name"`
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
WriteEnabled bool `json:"writeenabled"`
Name string `json:"name"`
URL string `json:"url"` // This is the url for origin's XRootD service and file transfer
WebURL string `json:"web_url,omitempty"` // This is the url for origin's web engine and APIs
Namespaces []NamespaceAd `json:"namespaces"`
EnableWrite bool `json:"enablewrite"`
EnableFallbackRead bool `json:"enable-fallback-read"` // True if the origin will allow direct client reads when no caches are available
}
)

Expand Down
Loading

0 comments on commit ca26ba8

Please sign in to comment.