Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1260 from haoming29/improv-topo-do…
Browse files Browse the repository at this point in the history
…wntime

Fetch topology downtime for cache servers
  • Loading branch information
jhiemstrawisc authored May 10, 2024
2 parents f8a8948 + 62e1d4c commit ee3ea0c
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 14 deletions.
55 changes: 54 additions & 1 deletion director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,66 @@ func parseServerAd(server utils.Server, serverType server_structs.ServerType) se
return serverAd
}

// Do a subtraction of excludeDowned set from the includeDowned set to find cache servers
// that are in downtime
//
// The excludeDowned is a list of running OSDF topology servers
// The includeDowned is a list of running and downed OSDF topology servers
func findDownedTopologyCache(excludeDowned, includeDowned []utils.Server) (caches []utils.Server) {
for _, included := range includeDowned {
found := false
for _, excluded := range excludeDowned {
if included == excluded {
found = true
break
}
}
if !found {
caches = append(caches, included)
}
}
return
}

// Update filteredServers based on topology downtime
func updateDowntimeFromTopology(excludedNss, includedNss *utils.TopologyNamespacesJSON) {
downedCaches := findDownedTopologyCache(excludedNss.Caches, includedNss.Caches)

filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()
// Remove existing filteredSevers that are fetched from the topology first
for key, val := range filteredServers {
if val == topoFiltered {
delete(filteredServers, key)
}
}
for _, dc := range downedCaches {
if sAd := serverAds.Get(dc.Endpoint); sAd == nil {
// The downed cache is not in the director yet
filteredServers[dc.Resource] = topoFiltered
} else {
// If we have the cache in the director, use it's name as the key
filteredServers[sAd.Value().Name] = topoFiltered
}
}
log.Infof("The following servers are put in downtime: %#v", filteredServers)
}

// Populate internal cache with origin/cache ads
func AdvertiseOSDF() error {
namespaces, err := utils.GetTopologyJSON()
namespaces, err := utils.GetTopologyJSON(false)
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON")
}

// Second call the fetch all servers (including servers in downtime)
includedNss, err := utils.GetTopologyJSON(true)
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON with server in downtime included (include_downed)")
}

updateDowntimeFromTopology(namespaces, includedNss)

cacheAdMap := make(map[server_structs.ServerAd][]server_structs.NamespaceAdV2)
originAdMap := make(map[server_structs.ServerAd][]server_structs.NamespaceAdV2)
tGen := server_structs.TokenGen{}
Expand Down
147 changes: 147 additions & 0 deletions director/advertise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,150 @@ func TestAdvertiseOSDF(t *testing.T) {
assert.Equal(t, oAds[0].AuthURL.String(), "https://origin2-auth-endpoint.com")
assert.Equal(t, cAds[0].URL.String(), "http://cache-endpoint.com")
}

func TestFindDownedTopologyCache(t *testing.T) {
mockTopoCacheA := utils.Server{AuthEndpoint: "cacheA.org:8443", Endpoint: "cacheA.org:8000", Resource: "CACHE_A"}
mockTopoCacheB := utils.Server{AuthEndpoint: "cacheB.org:8443", Endpoint: "cacheB.org:8000", Resource: "CACHE_B"}
mockTopoCacheC := utils.Server{AuthEndpoint: "cacheC.org:8443", Endpoint: "cacheC.org:8000", Resource: "CACHE_C"}
mockTopoCacheD := utils.Server{AuthEndpoint: "cacheD.org:8443", Endpoint: "cacheD.org:8000", Resource: "CACHE_D"}
t.Run("empty-response", func(t *testing.T) {
get := findDownedTopologyCache(
[]utils.Server{},
[]utils.Server{},
)
assert.Empty(t, get)
})

t.Run("no-downed-cache", func(t *testing.T) {
get := findDownedTopologyCache(
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD},
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD},
)
assert.Empty(t, get)
})

t.Run("one-downed-cache", func(t *testing.T) {
get := findDownedTopologyCache(
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC},
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD},
)
require.Len(t, get, 1)
assert.EqualValues(t, mockTopoCacheD, get[0])
})

t.Run("two-downed-cache", func(t *testing.T) {
get := findDownedTopologyCache(
[]utils.Server{mockTopoCacheB, mockTopoCacheC},
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD},
)
require.Len(t, get, 2)
assert.EqualValues(t, mockTopoCacheA, get[0])
assert.EqualValues(t, mockTopoCacheD, get[1])
})

t.Run("all-downed-cache", func(t *testing.T) {
get := findDownedTopologyCache(
[]utils.Server{},
[]utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD},
)
assert.EqualValues(t, []utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC, mockTopoCacheD}, get)
})
}

func TestUpdateDowntimeFromTopology(t *testing.T) {
mockTopoCacheA := utils.Server{AuthEndpoint: "cacheA.org:8443", Endpoint: "cacheA.org:8000", Resource: "CACHE_A"}
mockTopoCacheB := utils.Server{AuthEndpoint: "cacheB.org:8443", Endpoint: "cacheB.org:8000", Resource: "CACHE_B"}
mockTopoCacheC := utils.Server{AuthEndpoint: "cacheC.org:8443", Endpoint: "cacheC.org:8000", Resource: "CACHE_C"}

t.Run("no-change-with-same-downtime", func(t *testing.T) {
filteredServers = map[string]filterType{}
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{},
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB}},
)
checkResult := func() {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
assert.Len(t, filteredServers, 2)
require.NotEmpty(t, filteredServers[mockTopoCacheA.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheA.Resource])
require.NotEmpty(t, filteredServers[mockTopoCacheB.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheB.Resource])
}
checkResult()

// second round of updates
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{},
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB}},
)
// Same result
checkResult()
})

t.Run("one-server-back-online", func(t *testing.T) {
filteredServers = map[string]filterType{}
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{},
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB}},
)
func() {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
assert.Len(t, filteredServers, 2)
require.NotEmpty(t, filteredServers[mockTopoCacheA.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheA.Resource])
require.NotEmpty(t, filteredServers[mockTopoCacheB.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheB.Resource])
}()

// second round of updates
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA}}, // A is back online
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB}},
)

func() {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
assert.Len(t, filteredServers, 1)
require.NotEmpty(t, filteredServers[mockTopoCacheB.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheB.Resource])
}()
})

t.Run("one-more-server-in-downtime", func(t *testing.T) {
filteredServers = map[string]filterType{}
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{},
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB}},
)
func() {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
assert.Len(t, filteredServers, 2)
require.NotEmpty(t, filteredServers[mockTopoCacheA.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheA.Resource])
require.NotEmpty(t, filteredServers[mockTopoCacheB.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheB.Resource])
}()

// second round of updates
updateDowntimeFromTopology(
&utils.TopologyNamespacesJSON{Caches: []utils.Server{}},
&utils.TopologyNamespacesJSON{Caches: []utils.Server{mockTopoCacheA, mockTopoCacheB, mockTopoCacheC}},
)

func() {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()
assert.Len(t, filteredServers, 3)
require.NotEmpty(t, filteredServers[mockTopoCacheA.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheA.Resource])
require.NotEmpty(t, filteredServers[mockTopoCacheB.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheB.Resource])
require.NotEmpty(t, filteredServers[mockTopoCacheC.Resource])
assert.Equal(t, topoFiltered, filteredServers[mockTopoCacheC.Resource])
}()
})
}
24 changes: 21 additions & 3 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import (
type filterType string

const (
permFiltered filterType = "permFiltered" // Read from Director.FilteredServers
tempFiltered filterType = "tempFiltered" // Filtered by web UI
tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI
permFiltered filterType = "permFiltered" // Read from Director.FilteredServers
tempFiltered filterType = "tempFiltered" // Filtered by web UI, e.g. the server is put in downtime via the director website
topoFiltered filterType = "topologyFiltered" // Filtered by Topology, e.g. the server is put in downtime via the OSDF Topology change
tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI
)

var (
Expand All @@ -50,6 +51,23 @@ var (
filteredServersMutex = sync.RWMutex{}
)

func (f filterType) String() string {
switch f {
case permFiltered:
return "Permanently Disabled via the director configuration"
case tempFiltered:
return "Temporarily disabled via the admin website"
case topoFiltered:
return "Disabled via the Topology policy"
case tempAllowed:
return "Temporarily enabled via the admin website"
case "": // Here is to simplify the empty value at the UI side
return ""
default:
return "Unknown Type"
}
}

func recordAd(ad server_structs.ServerAd, namespaceAds *[]server_structs.NamespaceAdV2) {
if err := updateLatLong(&ad); err != nil {
log.Debugln("Failed to lookup GeoIP coordinates for host", ad.URL.Host)
Expand Down
2 changes: 2 additions & 0 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func checkFilter(serverName string) (bool, filterType) {
return true, permFiltered
case tempFiltered:
return true, tempFiltered
case topoFiltered:
return true, topoFiltered
case tempAllowed:
return false, tempAllowed
default:
Expand Down
10 changes: 8 additions & 2 deletions director/director_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type (
DirectReads bool `json:"enableFallbackRead"`
Listings bool `json:"enableListing"`
Filtered bool `json:"filtered"`
FilteredType filterType `json:"filteredType"`
FilteredType string `json:"filteredType"`
Status HealthTestStatus `json:"status"`
NamespacePrefixes []string `json:"namespacePrefixes"`
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func listServers(ctx *gin.Context) {
DirectReads: server.DirectReads,
Listings: server.Listings,
Filtered: filtered,
FilteredType: ft,
FilteredType: ft.String(),
Status: healthStatus,
}
for _, ns := range server.NamespaceAds {
Expand Down Expand Up @@ -268,6 +268,12 @@ func handleAllowServer(ctx *gin.Context) {
} else if ft == permFiltered {
// For servers to filter from the config, temporarily allow the server
filteredServers[sn] = tempAllowed
} else if ft == topoFiltered {
ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("Can't allow server %s that is disabled by the OSG Topology. Contact OSG admin at support@osg-htc.org to enable the server.", sn),
})
return
}
ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{Status: server_structs.RespOK, Msg: "success"})
}
Expand Down
12 changes: 6 additions & 6 deletions launchers/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group
log.Info("Initializing Director GeoIP database...")
director.InitializeDB(ctx)

director.ConfigFilterdServers()

director.LaunchTTLCache(ctx, egrp)

director.LaunchMapMetrics(ctx, egrp)

if config.GetPreferredPrefix() == config.OsdfPrefix {
metrics.SetComponentHealthStatus(metrics.DirectorRegistry_Topology, metrics.StatusWarning, "Start requesting from topology, status unknown")
log.Info("Generating/advertising server ads from OSG topology service...")
Expand All @@ -51,12 +57,6 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group
go director.PeriodicCacheReload(ctx)
}

director.LaunchTTLCache(ctx, egrp)

director.LaunchMapMetrics(ctx, egrp)

director.ConfigFilterdServers()

// Configure the shortcut middleware to either redirect to a cache
// or to an origin
defaultResponse := param.Director_DefaultResponse.GetString()
Expand Down
2 changes: 1 addition & 1 deletion registry/registry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func PopulateTopology() error {
}

// Next, get the values from topology
namespaces, err := utils.GetTopologyJSON()
namespaces, err := utils.GetTopologyJSON(false)
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON")
}
Expand Down
5 changes: 4 additions & 1 deletion utils/web_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,15 @@ func MakeRequest(ctx context.Context, url string, method string, data map[string
}

// GetTopologyJSON returns the namespaces and caches from OSDF topology
func GetTopologyJSON() (*TopologyNamespacesJSON, error) {
func GetTopologyJSON(includeDowned bool) (*TopologyNamespacesJSON, error) {
topoNamespaceUrl := param.Federation_TopologyNamespaceUrl.GetString()
if topoNamespaceUrl == "" {
metrics.SetComponentHealthStatus(metrics.DirectorRegistry_Topology, metrics.StatusCritical, "Topology namespaces.json configuration option (`Federation.TopologyNamespaceURL`) not set")
return nil, errors.New("Topology namespaces.json configuration option (`Federation.TopologyNamespaceURL`) not set")
}
if includeDowned {
topoNamespaceUrl += "?include_downed=1"
}

req, err := http.NewRequest(http.MethodGet, topoNamespaceUrl, nil)
if err != nil {
Expand Down

0 comments on commit ee3ea0c

Please sign in to comment.