Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement creation of a sharing URL #257

Merged
merged 8 commits into from
Oct 30, 2023
14 changes: 8 additions & 6 deletions client/acquire_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
oauth2_upstream "golang.org/x/oauth2"
)

func TokenIsAcceptable(jwtSerialized string, osdfPath string, namespace namespaces.Namespace, isWrite bool) bool {
func TokenIsAcceptable(jwtSerialized string, osdfPath string, namespace namespaces.Namespace, opts config.TokenGenerationOpts) bool {
parser := jwt.Parser{SkipClaimsValidation: true}
token, _, err := parser.ParseUnverified(jwtSerialized, &jwt.MapClaims{})
if err != nil {
Expand Down Expand Up @@ -71,7 +71,7 @@ func TokenIsAcceptable(jwtSerialized string, osdfPath string, namespace namespac
for _, scope := range strings.Split(scopes, " ") {
scope_info := strings.Split(scope, ":")
scopeOK := false
if isWrite && (scope_info[0] == "storage.modify" || scope_info[0] == "storage.create") {
if (opts.Operation == config.TokenWrite || opts.Operation == config.TokenSharedWrite) && (scope_info[0] == "storage.modify" || scope_info[0] == "storage.create") {
scopeOK = true
} else if scope_info[0] == "storage.read" {
scopeOK = true
Expand All @@ -84,7 +84,9 @@ func TokenIsAcceptable(jwtSerialized string, osdfPath string, namespace namespac
acceptableScope = true
break
}
if strings.HasPrefix(targetResource, scope_info[1]) {
// Shared URLs must have exact matches; otherwise, prefix matching is acceptable.
if ((opts.Operation == config.TokenSharedWrite || opts.Operation == config.TokenSharedRead) && (targetResource == scope_info[1])) ||
strings.HasPrefix(targetResource, scope_info[1]) {
acceptableScope = true
break
}
Expand Down Expand Up @@ -142,7 +144,7 @@ func RegisterClient(namespace namespaces.Namespace) (*config.PrefixEntry, error)

// Given a URL and a piece of the namespace, attempt to acquire a valid
// token for that URL.
func AcquireToken(destination *url.URL, namespace namespaces.Namespace, isWrite bool) (string, error) {
func AcquireToken(destination *url.URL, namespace namespaces.Namespace, opts config.TokenGenerationOpts) (string, error) {
log.Debugln("Acquiring a token from configuration and OAuth2")

if namespace.CredentialGen == nil || namespace.CredentialGen.Strategy == nil {
Expand Down Expand Up @@ -208,7 +210,7 @@ func AcquireToken(destination *url.URL, namespace namespaces.Namespace, isWrite
var acceptableToken *config.TokenEntry = nil
acceptableUnexpiredToken := ""
for idx, token := range prefixEntry.Tokens {
if !TokenIsAcceptable(token.AccessToken, destination.Path, namespace, isWrite) {
if !TokenIsAcceptable(token.AccessToken, destination.Path, namespace, opts) {
continue
}
if acceptableToken == nil {
Expand Down Expand Up @@ -262,7 +264,7 @@ func AcquireToken(destination *url.URL, namespace namespaces.Namespace, isWrite
}
}

token, err := oauth2.AcquireToken(issuer, prefixEntry, namespace.CredentialGen, destination.Path, isWrite)
token, err := oauth2.AcquireToken(issuer, prefixEntry, namespace.CredentialGen, destination.Path, opts)
if err != nil {
return "", err
}
Expand Down
8 changes: 6 additions & 2 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func HeaderParser(values string) (retMap map[string]string) {

// Given the Director response, create the ordered list of caches
// and store it as namespace.SortedDirectorCaches
func CreateNsFromDirectorResp(dirResp *http.Response, namespace *namespaces.Namespace) (err error) {
func CreateNsFromDirectorResp(dirResp *http.Response) (namespace namespaces.Namespace, err error) {
pelicanNamespaceHdr := dirResp.Header.Values("X-Pelican-Namespace")
if len(pelicanNamespaceHdr) == 0 {
return errors.New("Pelican director did not include mandatory X-Pelican-Namespace header in response")
err = errors.New("Pelican director did not include mandatory X-Pelican-Namespace header in response")
return
}
xPelicanNamespace := HeaderParser(pelicanNamespaceHdr[0])
namespace.Path = xPelicanNamespace["namespace"]
Expand Down Expand Up @@ -175,6 +176,9 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
func GetCachesFromDirectorResponse(resp *http.Response, needsToken bool) (caches []namespaces.DirectorCache, err error) {
// Get the Link header
linkHeader := resp.Header.Values("Link")
if len(linkHeader) == 0 {
return []namespaces.DirectorCache{}, nil
}

for _, linksStr := range strings.Split(linkHeader[0], ",") {
links := strings.Split(strings.ReplaceAll(linksStr, " ", ""), ";")
Expand Down
3 changes: 1 addition & 2 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func TestCreateNsFromDirectorResp(t *testing.T) {
}

// Call the function in question
var ns namespaces.Namespace
err := CreateNsFromDirectorResp(directorResponse, &ns)
ns, err := CreateNsFromDirectorResp(directorResponse)

// Test for expected outputs
assert.NoError(t, err, "Error creating Namespace from Director response")
Expand Down
8 changes: 6 additions & 2 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ func getToken(destination *url.URL, namespace namespaces.Namespace, isWrite bool

if token_location == "" {
if !ObjectClientOptions.Plugin {
value, err := AcquireToken(destination, namespace, isWrite)
opts := config.TokenGenerationOpts{Operation: config.TokenSharedRead}
if isWrite {
opts.Operation = config.TokenSharedWrite
}
value, err := AcquireToken(destination, namespace, opts)
if err == nil {
return value, nil
}
Expand Down Expand Up @@ -505,7 +509,7 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
AddError(err)
return 0, err
}
err = CreateNsFromDirectorResp(dirResp, &ns)
ns, err = CreateNsFromDirectorResp(dirResp)
if err != nil {
AddError(err)
return 0, err
Expand Down
106 changes: 106 additions & 0 deletions client/sharing_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/***************************************************************
*
* Copyright (C) 2023, University of Nebraska-Lincoln
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package client

import (
"net/url"
"strings"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

func getDirectorFromUrl(objectUrl *url.URL) (string, error) {
configDirectorUrl := param.Federation_DirectorUrl.GetString()
var directorUrl string
if objectUrl.Scheme == "pelican" {
if objectUrl.Host == "" {
if configDirectorUrl == "" {
return "", errors.New("Must specify (or configure) the federation hostname with the pelican://-style URLs")
}
directorUrl = configDirectorUrl
} else {
discoveryUrl := url.URL{
Scheme: "https",
Host: objectUrl.Host,
}
viper.Set("Federation.DirectorUrl", "")
viper.Set("Federation.DiscoveryUrl", discoveryUrl.String())
if err := config.DiscoverFederation(); err != nil {
return "", errors.Wrapf(err, "Failed to discover location of the director for the federation %s", objectUrl.Host)
}
if directorUrl = param.Federation_DirectorUrl.GetString(); directorUrl == "" {
return "", errors.Errorf("Director for the federation %s not discovered", objectUrl.Host)
}
}
} else if objectUrl.Scheme == "osdf" && configDirectorUrl == "" {
bbockelm marked this conversation as resolved.
Show resolved Hide resolved
if objectUrl.Host != "" {
objectUrl.Path = "/" + objectUrl.Host + objectUrl.Path
objectUrl.Host = ""
}
viper.Set("Federation.DiscoveryUrl", "https://osg-htc.org")
if err := config.DiscoverFederation(); err != nil {
return "", errors.Wrap(err, "Failed to discover director for the OSDF")
}
if directorUrl = param.Federation_DirectorUrl.GetString(); directorUrl == "" {
return "", errors.Errorf("Director for the OSDF not discovered")
}
} else if objectUrl.Scheme == "" {
if configDirectorUrl == "" {
return "", errors.Errorf("Must provide a federation name for path %s (e.g., pelican://osg-htc.org/%s)", objectUrl.Path, objectUrl.Path)
} else {
directorUrl = configDirectorUrl
}
} else if objectUrl.Scheme != "" {
return "", errors.Errorf("Unsupported scheme for pelican: %s://", objectUrl.Scheme)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a change request, per say, but I wonder how we could support having schemeless sharing. If I do osdf object share /chtc/staging/jhiemstra/chtc-auth-test.txt, might it be reasonable to generate a sharing URL that points to a cache or an origin? For example, if I curl -v https://origin-auth2000.chtc.wisc.edu:1095/chtc/staging/jhiemstra/chtc-auth-test.txt?authz=<TOKEN FROM RUNNING OBJECT SHARE>, I also get the file. We could generate this particular sharing URL when no scheme is provided.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following -- the prior conditional handles the case for schemaless sharing, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I tested in a way that pointed at the wrong director, which caused a failure I misinterpreted. After pointing back to the production director, this appears to work as intended.

}
return directorUrl, nil
}

func CreateSharingUrl(objectUrl *url.URL, isWrite bool) (string, error) {
directorUrl, err := getDirectorFromUrl(objectUrl)
if err != nil {
return "", err
}
objectUrl.Path = "/" + strings.TrimPrefix(objectUrl.Path, "/")

log.Debugln("Will query director for path", objectUrl.Path)
dirResp, err := QueryDirector(objectUrl.Path, directorUrl)
if err != nil {
log.Errorln("Error while querying the Director:", err)
return "", errors.Wrapf(err, "Error while querying the director at %s", directorUrl)
}
namespace, err := CreateNsFromDirectorResp(dirResp)
if err != nil {
return "", errors.Wrapf(err, "Unable to parse response from director at %s", directorUrl)
}

opts := config.TokenGenerationOpts{Operation: config.TokenSharedRead}
if isWrite {
opts.Operation = config.TokenSharedWrite
}
token, err := AcquireToken(objectUrl, namespace, opts)
if err != nil {
err = errors.Wrap(err, "Failed to acquire token")
}
return token, err
}
170 changes: 170 additions & 0 deletions client/sharing_url_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/***************************************************************
*
* Copyright (C) 2023, University of Nebraska-Lincoln
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package client

import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"

"github.com/pelicanplatform/pelican/config"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestDirectorGeneration(t *testing.T) {
returnError := false
returnErrorRef := &returnError

handler := func(w http.ResponseWriter, r *http.Request) {
discoveryConfig := `{"director_endpoint": "https://location.example.com", "namespace_registration_endpoint": "https://location.example.com/namespace", "jwks_uri": "https://location.example.com/jwks"}`
if *returnErrorRef {
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(discoveryConfig))
assert.NoError(t, err)
}
}
server := httptest.NewTLSServer(http.HandlerFunc(handler))
defer server.Close()
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)

objectUrl := url.URL{
Scheme: "pelican",
Host: serverURL.Host,
Path: "/test/foo",
}

// Discovery works to get URL
viper.Reset()
viper.Set("TLSSkipVerify", true)
err = config.InitClient()
require.NoError(t, err)
dUrl, err := getDirectorFromUrl(&objectUrl)
require.NoError(t, err)
assert.Equal(t, dUrl, "https://location.example.com")

// Discovery URL overrides the federation config.
viper.Reset()
viper.Set("TLSSkipVerify", true)
viper.Set("Federation.DirectorURL", "https://location2.example.com")
dUrl, err = getDirectorFromUrl(&objectUrl)
require.NoError(t, err)
assert.Equal(t, dUrl, "https://location.example.com")

// Fallback to configuration if no discovery present
viper.Reset()
viper.Set("Federation.DirectorURL", "https://location2.example.com")
objectUrl.Host = ""
dUrl, err = getDirectorFromUrl(&objectUrl)
require.NoError(t, err)
assert.Equal(t, dUrl, "https://location2.example.com")

// Error if server has an error
viper.Reset()
returnError = true
viper.Set("TLSSkipVerify", true)
objectUrl.Host = serverURL.Host
_, err = getDirectorFromUrl(&objectUrl)
require.Error(t, err)

// Error if neither config nor hostname provided.
viper.Reset()
objectUrl.Host = ""
_, err = getDirectorFromUrl(&objectUrl)
require.Error(t, err)

// Error on unknown scheme
viper.Reset()
objectUrl.Scheme = "buzzard"
_, err = getDirectorFromUrl(&objectUrl)
require.Error(t, err)
}

func TestSharingUrl(t *testing.T) {
// Construct a local server that we can poke with QueryDirector
myUrl := "http://redirect.com"
myUrlRef := &myUrl
log.SetLevel(log.DebugLevel)
handler := func(w http.ResponseWriter, r *http.Request) {
issuerLoc := *myUrlRef + "/issuer"

if strings.HasPrefix(r.URL.Path, "/test") {
w.Header().Set("Location", *myUrlRef)
w.Header().Set("X-Pelican-Namespace", "namespace=/test, require-token=true")
w.Header().Set("X-Pelican-Authorization", fmt.Sprintf("issuer=%s", issuerLoc))
w.Header().Set("X-Pelican-Token-Generation", fmt.Sprintf("issuer=%s, base-path=/test, strategy=OAuth2", issuerLoc))
w.WriteHeader(http.StatusTemporaryRedirect)
} else if r.URL.Path == "/issuer/.well-known/openid-configuration" {
w.WriteHeader(http.StatusOK)
oidcConfig := fmt.Sprintf(`{"token_endpoint": "%s/token", "registration_endpoint": "%s/register", "grant_types_supported": ["urn:ietf:params:oauth:grant-type:device_code"], "device_authorization_endpoint": "%s/device_authz"}`, issuerLoc, issuerLoc, issuerLoc)
_, err := w.Write([]byte(oidcConfig))
assert.NoError(t, err)
} else if r.URL.Path == "/issuer/register" {
//requestBytes, err := io.ReadAll(r.Body)
//assert.NoError(t, err)
clientConfig := `{"client_id": "client1", "client_secret": "secret", "client_secret_expires_at": 0}`
w.WriteHeader(http.StatusCreated)
_, err := w.Write([]byte(clientConfig))
assert.NoError(t, err)
} else if r.URL.Path == "/issuer/device_authz" {
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"device_code": "1234", "user_code": "5678", "interval": 1, "verification_uri": "https://example.com", "expires_in": 20}`))
assert.NoError(t, err)
} else if r.URL.Path == "/issuer/token" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"access_token": "token1234", "token_type": "jwt"}`))
assert.NoError(t, err)
} else {
fmt.Println(r)
requestBytes, err := io.ReadAll(r.Body)
assert.NoError(t, err)
fmt.Println(string(requestBytes))
w.WriteHeader(http.StatusInternalServerError)
}
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
myUrl = server.URL

os.Setenv("PELICAN_SKIP_TERMINAL_CHECK", "password")
defer os.Unsetenv("PELICAN_SKIP_TERMINAL_CHECK")
viper.Set("Federation.DirectorURL", myUrl)
viper.Set("ConfigDir", t.TempDir())
err := config.InitClient()
assert.NoError(t, err)

// Call QueryDirector with the test server URL and a source path
testUrl, err := url.Parse("/test/foo/bar")
require.NoError(t, err)
token, err := CreateSharingUrl(testUrl, true)
assert.NoError(t, err)
assert.NotEmpty(t, token)
fmt.Println(token)
}
Loading
Loading