Skip to content

Commit

Permalink
skipper: update metrics from kube metadata route
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Jan 11, 2024
1 parent fa772eb commit 0c3dd2f
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 3 deletions.
3 changes: 2 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
const (
DefaultLoadBalancerAlgorithm = "roundRobin"
MetadataRouteID = "kube__metadata"
EnableMetadataRoute = false // TODO: flag
)

const (
Expand Down Expand Up @@ -440,7 +441,7 @@ func (c *Client) loadAndConvert() ([]*eskip.Route, error) {
r = append(r, globalRedirectRoute(c.httpsRedirectCode))
}

if false { // TODO: flag
if EnableMetadataRoute {
r = append(r, metadataRoute(state))
}

Expand Down
90 changes: 88 additions & 2 deletions dataclients/kubernetes/metadataroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package kubernetes

import (
"encoding/json"
"net"
"strings"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/predicates"
"github.com/zalando/skipper/routing"
)

type MetadataPostProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

type postProcessor struct {
options *MetadataPostProcessorOptions
}

type kubeRouteMetadata struct {
Addresses []kubeRouteMetadataAddress `json:"addresses"`
}
Expand All @@ -19,6 +30,39 @@ type kubeRouteMetadataAddress struct {
TargetRef *objectReference `json:"targetRef"`
}

// NewMetadataPostProcessor creates post-processor for metadata route
func NewMetadataPostProcessor(options MetadataPostProcessorOptions) routing.PostProcessor {
return &postProcessor{options: &options}
}

func (pp *postProcessor) Do(routes []*routing.Route) []*routing.Route {
var metadata map[string]*kubeRouteMetadataAddress
for _, r := range routes {
if r.Id == MetadataRouteID {
metadata = parseMetadata(&r.Route)
break
}
}
if len(metadata) == 0 {
log.Errorf("metadata route not found")
return routes
}

endpointRegisty := pp.options.EndpointRegistry

for _, r := range routes {
if r.Route.BackendType == eskip.NetworkBackend {
metrics := endpointRegisty.GetMetrics(r.Host)
addMetadata(metrics, metadata, r.Host)
} else if r.Route.BackendType == eskip.LBBackend {
for _, ep := range r.LBEndpoints {
addMetadata(ep.Metrics, metadata, ep.Host)
}
}
}
return routes
}

// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and
// contains metadata for each endpoint address used by Ingresses and RouteGroups.
func metadataRoute(s *clusterState) *eskip.Route {
Expand Down Expand Up @@ -54,8 +98,50 @@ func metadataRoute(s *clusterState) *eskip.Route {
_ = json.NewEncoder(&b).Encode(&metadata)

return &eskip.Route{
Id: MetadataRouteID,
Predicates: []*eskip.Predicate{{Name: predicates.FalseName, Args: []any{b.String()}}},
Id: MetadataRouteID,
Predicates: []*eskip.Predicate{{
Name: predicates.FalseName,
// Use MetadataRouteID as the first argument to distinct from other False predicates
// that might be added by post-processors.
Args: []any{MetadataRouteID, b.String()},
}},
BackendType: eskip.ShuntBackend,
}
}

func parseMetadata(r *eskip.Route) map[string]*kubeRouteMetadataAddress {
for _, p := range r.Predicates {
if p.Name == predicates.FalseName && len(p.Args) == 2 && p.Args[0] == MetadataRouteID {
if data, ok := p.Args[1].(string); ok {
metadata := &kubeRouteMetadata{}
err := json.NewDecoder(strings.NewReader(data)).Decode(metadata)
if err != nil {
log.Errorf("failed to parse metadata route: %v", err)
return nil
}
result := make(map[string]*kubeRouteMetadataAddress)
for i := range metadata.Addresses {
addr := &metadata.Addresses[i]
result[addr.Address] = addr
}
return result
} else {
log.Errorf("failed to parse metadata route")
}
break
}
}
return nil
}

func addMetadata(metrics routing.Metrics, metadata map[string]*kubeRouteMetadataAddress, hostPort string) {
host, _, _ := net.SplitHostPort(hostPort)
if addr, ok := metadata[host]; ok {
metrics.SetTag("zone", addr.Zone)
metrics.SetTag("nodeName", addr.NodeName)
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
metrics.SetTag("pod", addr.TargetRef.Name)
metrics.SetTag("namespace", addr.TargetRef.Namespace)
}
}
}
15 changes: 15 additions & 0 deletions routing/endpointregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ type Metrics interface {
InflightRequests() int64
IncInflightRequest()
DecInflightRequest()

SetTag(name, value string)
Tag(name string) string
}

type entry struct {
detected atomic.Value // time.Time
lastSeen atomic.Value // time.Time
inflightRequests atomic.Int64
tags sync.Map // map[string]string
}

var _ Metrics = &entry{}
Expand Down Expand Up @@ -60,6 +64,17 @@ func (e *entry) SetLastSeen(ts time.Time) {
e.lastSeen.Store(ts)
}

func (e *entry) SetTag(name string, value string) {
e.tags.Store(name, value)
}

func (e *entry) Tag(name string) string {
if value, ok := e.tags.Load(name); ok {
return value.(string)
}
return ""
}

func newEntry() *entry {
result := &entry{}
result.SetDetected(time.Time{})
Expand Down
5 changes: 5 additions & 0 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1948,6 +1948,11 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
ro.PostProcessors = append(ro.PostProcessors, opaRegistry)
}

if kubernetes.EnableMetadataRoute {
opts := kubernetes.MetadataPostProcessorOptions{EndpointRegistry: endpointRegistry}
ro.PostProcessors = append(ro.PostProcessors, kubernetes.NewMetadataPostProcessor(opts))
}

if o.CustomRoutingPreProcessors != nil {
ro.PreProcessors = append(ro.PreProcessors, o.CustomRoutingPreProcessors...)
}
Expand Down

0 comments on commit 0c3dd2f

Please sign in to comment.