Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error responses for local cache #1041

Merged
merged 7 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cache/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
CacheServer struct {
server_structs.NamespaceHolder
namespaceFilter map[string]struct{}
pids []int
}
)

Expand All @@ -49,6 +50,17 @@ func (server *CacheServer) CreateAdvertisement(name string, originUrl string, or
return &ad, nil
}

func (server *CacheServer) SetPids(pids []int) {
server.pids = make([]int, len(pids))
copy(server.pids, pids)
}

func (server *CacheServer) GetPids() (pids []int) {
pids = make([]int, len(server.pids))
copy(pids, server.pids)
return
}

func (server *CacheServer) SetFilters() {
/*
* Converts the list of permitted namespaces to a set and stores it for the serve
Expand Down
55 changes: 49 additions & 6 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ type (
Err error
}

// StatusCodeError is a wrapper around grab.StatusCodeErorr that indicates the server returned
// a non-200 code.
//
// The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have
// a different underlying download package.
StatusCodeError grab.StatusCodeError

// Represents the results of a single object transfer,
// potentially across multiple attempts / retries.
TransferResults struct {
Expand Down Expand Up @@ -351,6 +358,18 @@ func (e *ConnectionSetupError) Is(target error) bool {
return ok
}

func (e *StatusCodeError) Error() string {
return (*grab.StatusCodeError)(e).Error()
}

func (e *StatusCodeError) Is(target error) bool {
sce, ok := target.(*StatusCodeError)
if !ok {
return false
}
return int(*sce) == int(*e)
}

// hasPort test the host if it includes a port
func hasPort(host string) bool {
var checkPort = regexp.MustCompile("^.*:[0-9]+$")
Expand Down Expand Up @@ -864,7 +883,7 @@ func (te *TransferEngine) runJobHandler() error {
//
// The returned object can be further customized as desired.
// This function does not "submit" the job for execution.
func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, upload bool, recursive bool, project string, options ...TransferOption) (tj *TransferJob, err error) {
func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, project string, options ...TransferOption) (tj *TransferJob, err error) {

id, err := uuid.NewV7()
if err != nil {
Expand All @@ -891,7 +910,19 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
token: tc.token,
project: project,
}
tj.ctx, tj.cancel = context.WithCancel(tc.ctx)

mergeCancel := func(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
newCtx, cancel := context.WithCancel(ctx1)
stop := context.AfterFunc(ctx2, func() {
cancel()
})
return newCtx, func() {
stop()
cancel()
}
}

tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx)

for _, option := range options {
switch option.Ident() {
Expand Down Expand Up @@ -1438,7 +1469,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er
errorString += "due to proxy " + AddrString + " error: " + ope.Unwrap().Error()
} else if errors.As(err, &cse) {
errorString += "+ proxy=" + strconv.FormatBool(transferEndpoint.Proxy) + ": "
if sce, ok := cse.Unwrap().(grab.StatusCodeError); ok {
if sce, ok := cse.Unwrap().(*StatusCodeError); ok {
errorString += sce.Error()
} else {
errorString += err.Error()
Expand Down Expand Up @@ -1543,6 +1574,12 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
return 0, 0, "", errors.New("Internal error: implementation is not a http.Client type")
}
httpClient.Transport = transport
headerTimeout := transport.ResponseHeaderTimeout
if headerTimeout > time.Second {
headerTimeout -= 500 * time.Millisecond
} else {
headerTimeout /= 2
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -1578,7 +1615,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
}
// Set the headers
req.HTTPRequest.Header.Set("X-Transfer-Status", "true")
req.HTTPRequest.Header.Set("X-Pelican-Timeout", param.Transport_ResponseHeaderTimeout.GetDuration().String())
req.HTTPRequest.Header.Set("X-Pelican-Timeout", headerTimeout.Round(time.Millisecond).String())
req.HTTPRequest.Header.Set("TE", "trailers")
req.HTTPRequest.Header.Set("User-Agent", getUserAgent(project))
req = req.WithContext(ctx)
Expand All @@ -1599,11 +1636,17 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall
// Check the error real quick
if resp.IsComplete() {
if err = resp.Err(); err != nil {
var sce grab.StatusCodeError
if errors.Is(err, grab.ErrBadLength) {
err = fmt.Errorf("Local copy of file is larger than remote copy %w", grab.ErrBadLength)
err = fmt.Errorf("local copy of file is larger than remote copy %w", grab.ErrBadLength)
} else if errors.As(err, &sce) {
log.Debugln("Creating a client status code error")
sce2 := StatusCodeError(sce)
err = &sce2
} else {
err = &ConnectionSetupError{Err: err}
}
log.Errorln("Failed to download:", err)
err = &ConnectionSetupError{Err: err}
return
}
}
Expand Down
4 changes: 3 additions & 1 deletion client/handle_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,9 @@ func TestSortAttempts(t *testing.T) {

func TestTimeoutHeaderSetForDownload(t *testing.T) {
viper.Reset()
config.InitConfig()
viper.Set("Transport.ResponseHeaderTimeout", 10*time.Second)
require.NoError(t, config.InitClient())
ctx, _, _ := test_utils.TestContext(context.Background(), t)

// We have this flag because our server will get a few requests throughout its lifetime and the other
Expand All @@ -470,7 +472,7 @@ func TestTimeoutHeaderSetForDownload(t *testing.T) {
if r.Header.Get("X-Pelican-Timeout") == "" {
t.Error("X-Pelican-Timeout header is not set")
}
assert.Equal(t, "10s", r.Header.Get("X-Pelican-Timeout"))
assert.Equal(t, "9.5s", r.Header.Get("X-Pelican-Timeout"))
timeoutHeaderFound = true
}
}))
Expand Down
6 changes: 3 additions & 3 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re
if err != nil {
return
}
tj, err := client.NewTransferJob(remoteDestUrl, localObject, true, recursive, project)
tj, err := client.NewTransferJob(context.Background(), remoteDestUrl, localObject, true, recursive, project)
if err != nil {
return
}
Expand Down Expand Up @@ -559,7 +559,7 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re
if err != nil {
return
}
tj, err := tc.NewTransferJob(remoteObjectUrl, localDestination, false, recursive, project)
tj, err := tc.NewTransferJob(context.Background(), remoteObjectUrl, localDestination, false, recursive, project)
if err != nil {
return
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv
if err != nil {
return
}
tj, err := tc.NewTransferJob(remoteURL, localPath, isPut, recursive, project)
tj, err := tc.NewTransferJob(context.Background(), remoteURL, localPath, isPut, recursive, project)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func serveCache(cmd *cobra.Command, _ []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.CacheType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.CacheType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func serveDirector(cmd *cobra.Command, args []string) error {
modules.Set(config.BrokerType)
}

cancel, err := launchers.LaunchModules(cmd.Context(), config.DirectorType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.DirectorType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func fedServeStart(cmd *cobra.Command, args []string) error {
}
}

cancel, err := launchers.LaunchModules(cmd.Context(), modules)
_, cancel, err := launchers.LaunchModules(cmd.Context(), modules)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestFedServeCache(t *testing.T) {

require.NoError(t, err)

fedCancel, err := launchers.LaunchModules(ctx, modules)
_, fedCancel, err := launchers.LaunchModules(ctx, modules)
defer fedCancel()
if err != nil {
log.Errorln("Failure in fedServeInternal:", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/fed_serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestFedServePosixOrigin(t *testing.T) {
viper.Set("Registry.RequireCacheApproval", false)
defer cancel()

fedCancel, err := launchers.LaunchModules(ctx, modules)
_, fedCancel, err := launchers.LaunchModules(ctx, modules)

defer fedCancel()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func serveOrigin(cmd *cobra.Command, args []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.OriginType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.OriginType)
if err != nil {
cancel()
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func runPluginWorker(ctx context.Context, upload bool, workChan <-chan PluginTra
var tj *client.TransferJob
urlCopy := *transfer.url
project := client.GetProjectName()
tj, err = tc.NewTransferJob(&urlCopy, transfer.localFile, upload, false, project, client.WithAcquireToken(false), client.WithCaches(caches...))
tj, err = tc.NewTransferJob(context.Background(), &urlCopy, transfer.localFile, upload, false, project, client.WithAcquireToken(false), client.WithCaches(caches...))
if err != nil {
return errors.Wrap(err, "Failed to create new transfer job")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (f *FedTest) Spinup() {
viper.Set("Registry.RequireOriginApproval", false)
viper.Set("Registry.RequireCacheApproval", false)

f.FedCancel, err = launchers.LaunchModules(ctx, modules)
_, f.FedCancel, err = launchers.LaunchModules(ctx, modules)
if err != nil {
f.T.Fatalf("Failure in fedServeInternal: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/registry_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func serveRegistry(cmd *cobra.Command, _ []string) error {
cancel, err := launchers.LaunchModules(cmd.Context(), config.RegistryType)
_, cancel, err := launchers.LaunchModules(cmd.Context(), config.RegistryType)
if err != nil {
cancel()
}
Expand Down
14 changes: 9 additions & 5 deletions daemon/launch_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,24 @@ func (launcher DaemonLauncher) Launch(ctx context.Context) (context.Context, int
return ctx_result, cmd.Process.Pid, nil
}

func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Group) (err error) {
func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Group) (pids []int, err error) {

daemons := make([]launchInfo, len(launchers))
pids = make([]int, len(launchers))
for idx, daemon := range launchers {
ctx, pid, err := daemon.Launch(ctx)
var newCtx context.Context
var pid int
newCtx, pid, err = daemon.Launch(ctx)
if err != nil {
err = errors.Wrapf(err, "Failed to launch %s daemon", daemon.Name())
// This is secure as long as deamon.Name() is either "xrootd" or "cmsd"
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(daemon.Name()), metrics.StatusCritical, err.Error())
return err
return
}
daemons[idx].ctx = ctx
daemons[idx].ctx = newCtx
daemons[idx].pid = pid
daemons[idx].name = daemon.Name()
pids[idx] = pid
log.Infoln("Successfully launched", daemon.Name())
metrics.SetComponentHealthStatus(metrics.HealthStatusComponent(daemon.Name()), metrics.StatusOK, "")
}
Expand Down Expand Up @@ -257,5 +261,5 @@ func LaunchDaemons(ctx context.Context, launchers []Launcher, egrp *errgroup.Gro
}
})

return nil
return
}
8 changes: 7 additions & 1 deletion fed_test_utils/fed.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type (
Token string
Ctx context.Context
Egrp *errgroup.Group
Pids []int
}
)

Expand Down Expand Up @@ -139,9 +140,14 @@ func NewFedTest(t *testing.T, originConfig string) (ft *FedTest) {
err = config.InitServer(ctx, modules)
require.NoError(t, err)

_, err = launchers.LaunchModules(ctx, modules)
servers, _, err := launchers.LaunchModules(ctx, modules)
require.NoError(t, err)

ft.Pids = make([]int, 0, 2)
for _, server := range servers {
ft.Pids = append(ft.Pids, server.GetPids()...)
}

desiredURL := param.Server_ExternalWebUrl.GetString() + "/api/v1.0/health"
err = server_utils.WaitUntilWorking(ctx, "GET", desiredURL, "director", 200)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
return nil, err
}

if err = daemon.LaunchDaemons(ctx, launchers, egrp); err != nil {
pids, err := daemon.LaunchDaemons(ctx, launchers, egrp)
if err != nil {
return nil, err
}

cacheServer.SetPids(pids)
return cacheServer, nil
}

Expand Down
Loading
Loading