Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#960 from haoming29/dir-based-cache…
Browse files Browse the repository at this point in the history
…-tests

Director based cache health tests
  • Loading branch information
jhiemstrawisc authored Mar 26, 2024
2 parents adf0924 + 8adf997 commit e60b5d3
Show file tree
Hide file tree
Showing 82 changed files with 3,386 additions and 2,942 deletions.
4 changes: 2 additions & 2 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/registry"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/test_utils"
"github.com/pelicanplatform/pelican/token_scopes"
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestRetrieveTimeout(t *testing.T) {
err = json.Unmarshal(responseBytes, &brokerResp)
require.NoError(t, err)

assert.Equal(t, common.RespPollTimeout, brokerResp.Status)
assert.Equal(t, server_structs.RespPollTimeout, brokerResp.Status)

ctx, cancelFunc := context.WithTimeout(ctx, 50*time.Millisecond)
defer cancelFunc()
Expand Down
16 changes: 8 additions & 8 deletions broker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ import (
"sync/atomic"
"time"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/token_scopes"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -240,7 +240,7 @@ func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string)
err = errors.Wrap(err, "Failure when reading response from broker response")
}
if resp.StatusCode >= 400 {
errResp := common.SimpleApiResp{}
errResp := server_structs.SimpleApiResp{}
log.Errorf("Failure (status code %d) when invoking the broker: %s", resp.StatusCode, string(responseBytes))
if err = json.Unmarshal(responseBytes, &errResp); err != nil {
err = errors.Errorf("Failure when invoking the broker (status code %d); unable to parse error message", resp.StatusCode)
Expand Down Expand Up @@ -299,7 +299,7 @@ func ConnectToOrigin(ctx context.Context, brokerUrl, prefix, originName string)
hj, ok := writer.(http.Hijacker)
if !ok {
log.Debug("Not able to hijack underlying TCP connection from server")
resp := common.SimpleApiResp{
resp := server_structs.SimpleApiResp{
Msg: "Unable to reverse TCP connection; HTTP/2 in use",
Status: "error",
}
Expand Down Expand Up @@ -470,7 +470,7 @@ func doCallback(ctx context.Context, brokerResp reversalRequest) (listener net.L
}

if resp.StatusCode >= 400 {
errResp := common.SimpleApiResp{}
errResp := server_structs.SimpleApiResp{}
if err = json.Unmarshal(responseBytes, &errResp); err != nil {
err = errors.Errorf("Failure when invoking cache %s callback (status code %d); unable to parse error message", brokerResp.CallbackUrl, resp.StatusCode)
} else {
Expand Down Expand Up @@ -617,7 +617,7 @@ func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan
break
}
if resp.StatusCode >= 400 {
errResp := common.SimpleApiResp{}
errResp := server_structs.SimpleApiResp{}
if err = json.Unmarshal(responseBytes, &errResp); err != nil {
log.Errorf("Failure when invoking the broker (status code %d); unable to parse error message", resp.StatusCode)
} else {
Expand All @@ -634,17 +634,17 @@ func LaunchRequestMonitor(ctx context.Context, egrp *errgroup.Group, resultChan
break
}

if brokerResp.Status == common.RespOK {
if brokerResp.Status == server_structs.RespOK {
listener, err := doCallback(ctx, brokerResp.Request)
if err != nil {
log.Errorln("Failed to callback to the cache:", err)
resultChan <- err
break
}
resultChan <- listener
} else if brokerResp.Status == common.RespFailed {
} else if brokerResp.Status == server_structs.RespFailed {
log.Errorln("Broker responded to origin retrieve with an error:", brokerResp.Msg)
} else if brokerResp.Status != common.RespPollTimeout { // We expect timeouts; do not log them.
} else if brokerResp.Status != server_structs.RespPollTimeout { // We expect timeouts; do not log them.
if brokerResp.Msg != "" {
log.Errorf("Broker responded with unknown status (%s); msg: %s", brokerResp.Status, brokerResp.Msg)
} else {
Expand Down
14 changes: 7 additions & 7 deletions broker/server_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/token_scopes"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand All @@ -42,7 +42,7 @@ type (

// Response for a successful retrieval
brokerRetrievalResp struct {
common.SimpleApiResp
server_structs.SimpleApiResp
Request reversalRequest `json:"req"`
}

Expand All @@ -54,19 +54,19 @@ type (

func newBrokerReqResp(req reversalRequest) (result brokerRetrievalResp) {
result.Request = req
result.SimpleApiResp.Status = common.RespOK
result.SimpleApiResp.Status = server_structs.RespOK
return
}

func newBrokerRespFail(msg string) common.SimpleApiResp {
return common.SimpleApiResp{
Status: common.RespFailed,
func newBrokerRespFail(msg string) server_structs.SimpleApiResp {
return server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: msg,
}
}

func newBrokerRespTimeout() (result brokerRetrievalResp) {
result.SimpleApiResp.Status = common.RespPollTimeout
result.SimpleApiResp.Status = server_structs.RespPollTimeout
return
}

Expand Down
22 changes: 10 additions & 12 deletions cache_ui/advertise.go → cache/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,30 @@
*
***************************************************************/

package cache_ui
package cache

import (
"context"
"encoding/json"
"net/url"
"strings"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/director"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/utils"
"github.com/pkg/errors"
)

type (
CacheServer struct {
server_utils.NamespaceHolder
server_structs.NamespaceHolder
namespaceFilter map[string]struct{}
}
)

func (server *CacheServer) CreateAdvertisement(name string, originUrl string, originWebUrl string) (*common.OriginAdvertiseV2, error) {
ad := common.OriginAdvertiseV2{
func (server *CacheServer) CreateAdvertisement(name string, originUrl string, originWebUrl string) (*server_structs.OriginAdvertiseV2, error) {
ad := server_structs.OriginAdvertiseV2{
Name: name,
DataURL: originUrl,
WebURL: originWebUrl,
Expand All @@ -68,13 +66,13 @@ func (server *CacheServer) SetFilters() {
}
}

func (server *CacheServer) filterAdsBasedOnNamespace(nsAds []common.NamespaceAdV2) []common.NamespaceAdV2 {
func (server *CacheServer) filterAdsBasedOnNamespace(nsAds []server_structs.NamespaceAdV2) []server_structs.NamespaceAdV2 {
/*
* Filters out ads based on the namespaces listed in server.NamespaceFilter
* Note that this does a few checks for trailing and non-trailing "/" as it's assumed that the namespaces
* from the director and the ones provided might differ.
*/
filteredAds := []common.NamespaceAdV2{}
filteredAds := []server_structs.NamespaceAdV2{}
if len(server.namespaceFilter) > 0 {
for _, ad := range nsAds {
ns := ad.Path
Expand Down Expand Up @@ -111,7 +109,7 @@ func (server *CacheServer) filterAdsBasedOnNamespace(nsAds []common.NamespaceAdV

func (server *CacheServer) GetNamespaceAdsFromDirector() error {
// Get the endpoint of the director
var respNS []common.NamespaceAdV2
var respNS []server_structs.NamespaceAdV2

directorEndpoint := param.Federation_DirectorUrl.GetString()
if directorEndpoint == "" {
Expand Down Expand Up @@ -140,14 +138,14 @@ func (server *CacheServer) GetNamespaceAdsFromDirector() error {
return err
}
respData, err = utils.MakeRequest(context.Background(), directorNSListEndpointURL, "GET", nil, nil)
var respNSV1 []common.NamespaceAdV1
var respNSV1 []server_structs.NamespaceAdV1
if err != nil {
return errors.Wrap(err, "Failed to make request")
} else {
if jsonErr := json.Unmarshal(respData, &respNSV1); jsonErr == nil { // Error creating json
return errors.Wrapf(err, "Failed to make request: %v", err)
}
respNS = director.ConvertNamespaceAdsV1ToV2(respNSV1, nil)
respNS = server_structs.ConvertNamespaceAdsV1ToV2(respNSV1, nil)
}
} else {
return errors.Wrap(err, "Failed to make request")
Expand Down
6 changes: 3 additions & 3 deletions cache_ui/advertise_test.go → cache/advertise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*
***************************************************************/

package cache_ui
package cache

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestFilterNsAdsForCache(t *testing.T) {
viper.Reset()
defer viper.Reset()

nsAds := []common.NamespaceAdV2{
nsAds := []server_structs.NamespaceAdV2{
{
Path: "/ns1",
},
Expand Down
2 changes: 1 addition & 1 deletion cache_ui/broker_client.go → cache/broker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
***************************************************************/

package cache_ui
package cache

import (
"bufio"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*
***************************************************************/

package cache_ui
package cache

import (
"context"
Expand Down
86 changes: 86 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package cache

import (
"context"
"errors"
"os"
"path/filepath"
"time"

"github.com/gin-gonic/gin"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_utils"
"golang.org/x/sync/errgroup"
)

var (
notificationChan = make(chan bool)
)

func RegisterCacheAPI(router *gin.Engine, ctx context.Context, egrp *errgroup.Group) {
// start the timer for the director test report timeout
server_utils.LaunchPeriodicDirectorTimeout(ctx, egrp, notificationChan)

group := router.Group("/api/v1.0/cache")
{
group.POST("/directorTest", func(ginCtx *gin.Context) { server_utils.HandleDirectorTestResponse(ginCtx, notificationChan) })
}
}

// Periodically scan the /<runLocation>/pelican/monitoring directory to clean up test files
func LaunchDirectorTestFileCleanup(ctx context.Context) {
server_utils.LaunchWatcherMaintenance(ctx,
[]string{filepath.Join(param.Cache_DataLocation.GetString(), "pelican", "monitoring")},
"cache director-based health test clean up",
time.Minute,
func(notifyEvent bool) error {
// We run this function regardless of notifyEvent to do the cleanup
dirPath := filepath.Join(param.Cache_DataLocation.GetString(), "pelican", "monitoring")
dirInfo, err := os.Stat(dirPath)
if err != nil {
return err
} else {
if !dirInfo.IsDir() {
return errors.New("monitoring path is not a directory: " + dirPath)
}
}
dirItems, err := os.ReadDir(dirPath)
if err != nil {
return err
}
if len(dirItems) <= 2 { // At mininum there are the test file and .cinfo file, and we don't want to remove the last two
return nil
}
for idx, item := range dirItems {
// For all but the latest two files (test file and its .cinfo file)
// os.ReadDir sorts dirEntries in order of file names. Since our test file names are timestamped and is string comparable,
// the last two files should be the latest test files, which we want to keep
if idx < len(dirItems)-2 {
err := os.Remove(filepath.Join(dirPath, item.Name()))
if err != nil {
return err
}
}
}
return nil
},
)
}
20 changes: 19 additions & 1 deletion cache_ui/cinfo.go → cache/cinfo.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
package cache_ui
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package cache

import (
"bytes"
Expand Down
20 changes: 19 additions & 1 deletion cache_ui/self_monitor.go → cache/self_monitor.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
package cache_ui
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package cache

import (
"context"
Expand Down
Loading

0 comments on commit e60b5d3

Please sign in to comment.