Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#958 from bbockelm/https_origin
Browse files Browse the repository at this point in the history
Add support for HTTP backend
  • Loading branch information
jhiemstrawisc authored Apr 9, 2024
2 parents e91131e + 72d4959 commit 9c00963
Show file tree
Hide file tree
Showing 32 changed files with 767 additions and 310 deletions.
4 changes: 4 additions & 0 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

namespaces "github.com/pelicanplatform/pelican/namespaces"
"github.com/pelicanplatform/pelican/utils"
Expand Down Expand Up @@ -123,6 +124,9 @@ func TestCreateNsFromDirectorResp(t *testing.T) {

func TestNewTransferDetailsUsingDirector(t *testing.T) {
os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

// Construct the input caches
// Cache with http
Expand Down
9 changes: 4 additions & 5 deletions client/fed_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
// Create a token file
issuer, err := config.GetServerIssuerURL()
require.NoError(t, err)
audience := config.GetServerAudience()

tokenConfig := token.NewWLCGToken()
tokenConfig.Lifetime = time.Minute
tokenConfig.Issuer = issuer
tokenConfig.Subject = "origin"
tokenConfig.AddAudiences(audience)
tokenConfig.AddAudienceAny()
tokenConfig.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"),
token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/"))
token, err := tokenConfig.CreateToken()
Expand Down Expand Up @@ -114,7 +113,7 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
_, err := config.SetPreferredPrefix("PELICAN")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
Expand Down Expand Up @@ -199,7 +198,7 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
t.Run("testOsdfRecursiveGetAndPutOsdfURL", func(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)
for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
Expand Down Expand Up @@ -293,7 +292,7 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
Expand Down
31 changes: 14 additions & 17 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ func TestGetAndPutAuth(t *testing.T) {

issuer, err := config.GetServerIssuerURL()
require.NoError(t, err)
audience := config.GetServerAudience()

// Create a token file
tokenConfig := token.NewWLCGToken()
tokenConfig.Lifetime = time.Minute
tokenConfig.Issuer = issuer
tokenConfig.Subject = "origin"
tokenConfig.AddAudiences(audience)
tokenConfig.AddAudienceAny()

scopes := []token_scopes.TokenScope{}
readScope, err := token_scopes.Storage_Read.Path("/")
Expand All @@ -108,7 +107,7 @@ func TestGetAndPutAuth(t *testing.T) {
assert.NoError(t, err)

// Set path for object to upload/download
for _, export := range *fed.Exports {
for _, export := range fed.Exports {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
Expand All @@ -135,7 +134,7 @@ func TestGetAndPutAuth(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestGetAndPutAuth(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
Expand Down Expand Up @@ -219,14 +218,13 @@ func TestCopyAuth(t *testing.T) {

issuer, err := config.GetServerIssuerURL()
require.NoError(t, err)
audience := config.GetServerAudience()

// Create a token file
tokenConfig := token.NewWLCGToken()
tokenConfig.Lifetime = time.Minute
tokenConfig.Issuer = issuer
tokenConfig.Subject = "origin"
tokenConfig.AddAudiences(audience)
tokenConfig.AddAudienceAny()

scopes := []token_scopes.TokenScope{}
readScope, err := token_scopes.Storage_Read.Path("/")
Expand All @@ -253,7 +251,7 @@ func TestCopyAuth(t *testing.T) {
assert.NoError(t, err)

// Set path for object to upload/download
for _, export := range *fed.Exports {
for _, export := range fed.Exports {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
Expand All @@ -280,7 +278,7 @@ func TestCopyAuth(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
Expand Down Expand Up @@ -308,7 +306,7 @@ func TestCopyAuth(t *testing.T) {
_, err := config.SetPreferredPrefix("OSDF")
assert.NoError(t, err)

for _, export := range *fed.Exports {
for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
Expand Down Expand Up @@ -351,11 +349,10 @@ func TestCopyAuth(t *testing.T) {
func TestGetPublicRead(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, bothPublicOriginCfg)

t.Run("testPubObjGet", func(t *testing.T) {
for _, export := range *fed.Exports {
for _, export := range fed.Exports {
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(export.StoragePrefix, "test.txt"))
Expand Down Expand Up @@ -398,7 +395,7 @@ func TestStatHttp(t *testing.T) {
t.Run("testStatHttpPelicanScheme", func(t *testing.T) {
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(((*fed.Exports)[0]).StoragePrefix, "test.txt"))
tempFile, err := os.Create(filepath.Join(fed.Exports[0].StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
Expand All @@ -410,7 +407,7 @@ func TestStatHttp(t *testing.T) {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
((*fed.Exports)[0]).FederationPrefix, fileName)
fed.Exports[0].FederationPrefix, fileName)

log.Errorln(uploadURL)

Expand All @@ -427,7 +424,7 @@ func TestStatHttp(t *testing.T) {
assert.NoError(t, err)
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(((*fed.Exports)[0]).StoragePrefix, "test.txt"))
tempFile, err := os.Create(filepath.Join(fed.Exports[0].StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
Expand All @@ -438,7 +435,7 @@ func TestStatHttp(t *testing.T) {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)

uploadURL := fmt.Sprintf("osdf://%s/%s", ((*fed.Exports)[0]).FederationPrefix, fileName)
uploadURL := fmt.Sprintf("osdf://%s/%s", fed.Exports[0].FederationPrefix, fileName)
hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt())

// Set our metadata values in config since that is what this url scheme - prefix combo does in handle_http
Expand All @@ -460,7 +457,7 @@ func TestStatHttp(t *testing.T) {
t.Run("testStatHttpIncorrectScheme", func(t *testing.T) {
testFileContent := "test file content"
// Drop the testFileContent into the origin directory
tempFile, err := os.Create(filepath.Join(((*fed.Exports)[0]).StoragePrefix, "test.txt"))
tempFile, err := os.Create(filepath.Join(fed.Exports[0].StoragePrefix, "test.txt"))
assert.NoError(t, err, "Error creating temp file")
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
Expand Down
27 changes: 26 additions & 1 deletion client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func TestIsPort(t *testing.T) {
// TestNewTransferDetails checks the creation of transfer details
func TestNewTransferDetails(t *testing.T) {
os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

// Case 1: cache with http
testCache := namespaces.Cache{
Expand Down Expand Up @@ -120,6 +123,11 @@ func TestNewTransferDetails(t *testing.T) {
}

func TestNewTransferDetailsEnv(t *testing.T) {
os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

testCache := namespaces.Cache{
AuthEndpoint: "cache.edu:8443",
Endpoint: "cache.edu:8000",
Expand All @@ -129,10 +137,12 @@ func TestNewTransferDetailsEnv(t *testing.T) {
os.Setenv("OSG_DISABLE_PROXY_FALLBACK", "")
err := config.InitClient()
assert.Nil(t, err)
transfers := newTransferDetails(testCache, transferDetailsOptions{false, ""})
transfers := newTransferDetails(testCache, transferDetailsOptions{})
assert.Equal(t, 1, len(transfers))
assert.Equal(t, true, transfers[0].Proxy)

os.Unsetenv("http_proxy")

transfers = newTransferDetails(testCache, transferDetailsOptions{true, ""})
assert.Equal(t, 1, len(transfers))
assert.Equal(t, "https", transfers[0].Url.Scheme)
Expand Down Expand Up @@ -182,6 +192,12 @@ func TestSlowTransfers(t *testing.T) {
Endpoint: svr.URL,
Resource: "Cache",
}

os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

transfers := newTransferDetails(testCache, transferDetailsOptions{false, ""})
assert.Equal(t, 2, len(transfers))
assert.Equal(t, svr.URL, transfers[0].Url.String())
Expand Down Expand Up @@ -215,6 +231,10 @@ func TestSlowTransfers(t *testing.T) {
// Test stopped transfer
func TestStoppedTransfer(t *testing.T) {
os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

ctx, _, _ := test_utils.TestContext(context.Background(), t)

// Adjust down the timeouts
Expand Down Expand Up @@ -318,6 +338,11 @@ func TestTrailerError(t *testing.T) {

defer svr.Close()

os.Setenv("http_proxy", "http://proxy.edu:3128")
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("http_proxy"))
})

testCache := namespaces.Cache{
AuthEndpoint: svr.URL,
Endpoint: svr.URL,
Expand Down
73 changes: 73 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package main

import (
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
)

func main() {
Expand Down Expand Up @@ -59,6 +63,75 @@ func handleCLI(args []string) error {
return nil
}
err := Execute()
if errors.Is(err, server_utils.ErrInvalidOriginConfig) {
mode := param.Origin_StorageType.GetString()
backendType, _ := server_utils.ParseOriginStorageType(mode)
switch backendType {
case server_utils.OriginStoragePosix:
fmt.Fprintf(os.Stderr, `
Export information was not correct.
For POSIX, to specify exports via the command line, use:
-v /mnt/foo:/bar -v /mnt/test:/baz
to export the directories /mnt/foo and /mnt/test under the namespace prefixes /bar and /baz, respectively.
Alternatively, specify Origin.Exports in the parameters.yaml file:
Origin:
Exports:
- StoragePrefix: /mnt/foo
FederationPrefix: /bar
Capabilities: ["PublicReads", "Writes", "Listings"]
- StoragePrefix: /mnt/test
FederationPrefix: /baz
Capabilities: ["Writes"]
to export the directories /mnt/foo and /mnt/test under the namespace prefixes /bar and /baz, respectively (with listed permissions).
`)
case server_utils.OriginStorageS3:
fmt.Fprintf(os.Stderr, `
Export information was not correct.
To specify exports via the command line, use:
-v my-bucket:/my/prefix (REQUIRED --service-url https://my-s3-url.com) (REQUIRED --url-style <path or virtual>) \
(REQUIRED --region "my-region") (OPTIONAL --bucket-access-keyfile /path/to/access.key) \
(OPTIONAL --bucket-secret-keyfile /path/to/secret.key)
to export the S3 bucket under the namespace prefix /my/prefix.
Alternatively, specify Origin.Exports in the parameters.yaml file:
Origin:
StorageType: s3
S3UrlStyle: <path or virtual>
S3ServiceUrl: https://my-s3-url.com
S3Region: my-region
Exports:
- FederationPrefix: /my/prefix
S3Bucket: my-bucket
S3AccessKeyfile: /path/to/access.key
S3SecretKeyfile: /path/to/secret.key
Capabilities: ["PublicReads", "Writes", "Listings"]
to export the S3 bucket my-bucket from https://my-s3-url.com under the namespace prefix /my/prefix (with listed permissions).
`)
case server_utils.OriginStorageHTTPS:
fmt.Fprintf(os.Stderr, `
Export information was not correct.
HTTPS exports must be specified via configuration file. Example:
Origin:
StorageType: https
FederationPrefix: /my/prefix
HttpServiceUrl: "https://example.com/testfiles"
Capabilities: ["PublicReads", "Writes", "Listings"]
`)
default:
fmt.Fprintf(os.Stderr, "Currently-supported origin modes include posix, https, and s3, but you provided %s.", mode)
}
}
if err != nil {
os.Exit(1)
}
Expand Down
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,16 @@ func InitServer(ctx context.Context, currentServers ServerType) error {
viper.SetDefault("Cache.Url", fmt.Sprintf("https://%v", param.Server_Hostname.GetString()))
}

if viper.GetString("Origin.StorageType") == "https" {
if viper.GetString("Origin.HTTPServiceUrl") == "" {
return errors.New("Origin.HTTPServiceUrl may not be empty")
}
_, err := url.Parse(viper.GetString("Origin.HTTPServiceUrl"))
if err != nil {
return errors.Wrap(err, "unable to parse Origin.HTTPServiceUrl as a URL")
}
}

if param.Cache_LowWatermark.IsSet() || param.Cache_HighWaterMark.IsSet() {
lowWmStr := param.Cache_LowWatermark.GetString()
highWmStr := param.Cache_HighWaterMark.GetString()
Expand Down
Loading

0 comments on commit 9c00963

Please sign in to comment.