Skip to content

Commit

Permalink
dataclients/kubernetes: use route pre-processor to decode metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Jan 12, 2024
1 parent 43eff37 commit 113fdb0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 43 deletions.
88 changes: 53 additions & 35 deletions dataclients/kubernetes/metadataroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/eskip"
"github.com/zalando/skipper/predicates"
"github.com/zalando/skipper/routing"

snet "github.com/zalando/skipper/net"
)

type MetadataPostProcessorOptions struct {
type MetadataPreProcessorOptions struct {
EndpointRegistry *routing.EndpointRegistry
}

type postProcessor struct {
options *MetadataPostProcessorOptions
type metadataPreProcessor struct {
options MetadataPreProcessorOptions
}

type kubeRouteMetadata struct {
Expand All @@ -31,39 +32,48 @@ type kubeRouteMetadataAddress struct {
TargetRef *objectReference `json:"targetRef"`
}

// NewMetadataPostProcessor creates post-processor for metadata route
func NewMetadataPostProcessor(options MetadataPostProcessorOptions) routing.PostProcessor {
return &postProcessor{options: &options}
// NewMetadataPreProcessor creates post-processor for metadata route
func NewMetadataPreProcessor(options MetadataPreProcessorOptions) routing.PreProcessor {
return &metadataPreProcessor{options: options}
}

func (pp *postProcessor) Do(routes []*routing.Route) []*routing.Route {
var metadata map[string]*kubeRouteMetadataAddress
var err error
func (pp *metadataPreProcessor) Do(routes []*eskip.Route) []*eskip.Route {
var metadataRoute *eskip.Route
filtered := make([]*eskip.Route, 0, len(routes))

for _, r := range routes {
if r.Id == MetadataRouteID {
metadata, err = decodeMetadata(&r.Route)
if err != nil {
log.Errorf("Failed to decode metadata: %v", err)
return routes
if metadataRoute == nil {
metadataRoute = r
} else {
log.Errorf("Found multiple metadata routes, using the first one")
}
break
} else {
filtered = append(filtered, r)
}
}

endpointRegisty := pp.options.EndpointRegistry
if metadataRoute == nil {
log.Errorf("Metadata route not found")
return routes
}

for _, r := range routes {
if r.Route.BackendType == eskip.NetworkBackend {
metrics := endpointRegisty.GetMetrics(r.Host)
addMetadata(metrics, metadata, r.Host)
} else if r.Route.BackendType == eskip.LBBackend {
metadata, err := decodeMetadata(metadataRoute)
if err != nil {
log.Errorf("Failed to decode metadata route: %v", err)
return filtered
}

for _, r := range filtered {
if r.BackendType == eskip.NetworkBackend {
pp.addMetadata(metadata, r.Backend)
} else if r.BackendType == eskip.LBBackend {
for _, ep := range r.LBEndpoints {
addMetadata(ep.Metrics, metadata, ep.Host)
pp.addMetadata(metadata, ep)
}
}
}
return routes
return filtered
}

// metadataRoute creates a route with [MetadataRouteID] id that matches no requests and
Expand Down Expand Up @@ -119,18 +129,6 @@ func decodeMetadata(r *eskip.Route) (map[string]*kubeRouteMetadataAddress, error
return result, nil
}

func addMetadata(metrics routing.Metrics, metadata map[string]*kubeRouteMetadataAddress, hostPort string) {
host, _, _ := net.SplitHostPort(hostPort)
if addr, ok := metadata[host]; ok {
metrics.SetTag("zone", addr.Zone)
metrics.SetTag("nodeName", addr.NodeName)
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
metrics.SetTag("pod", addr.TargetRef.Name)
metrics.SetTag("namespace", addr.TargetRef.Namespace)
}
}
}

const dataUriPrefix = "data:application/json;base64,"

// encodeDataURI encodes metadata into data URI.
Expand Down Expand Up @@ -161,3 +159,23 @@ func decodeDataURI(uri string) (*kubeRouteMetadata, error) {
}
return &metadata, nil
}

func (pp *metadataPreProcessor) addMetadata(metadata map[string]*kubeRouteMetadataAddress, endpoint string) {
_, host, err := snet.SchemeHost(endpoint)
if err != nil {
return
}

addr, ok := metadata[host]
if !ok {
return
}

metrics := pp.options.EndpointRegistry.GetMetrics(host)
metrics.SetTag("zone", addr.Zone)
metrics.SetTag("nodeName", addr.NodeName)
if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
metrics.SetTag("pod", addr.TargetRef.Name)
metrics.SetTag("namespace", addr.TargetRef.Namespace)
}
}
3 changes: 0 additions & 3 deletions net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ func SchemeHost(input string) (string, string, error) {
if u.Scheme == "" {
return "", "", fmt.Errorf(`parse %q: missing scheme`, input)
}
if u.Scheme == "data" {
return u.Scheme, "", nil
}
if u.Host == "" {
return "", "", fmt.Errorf(`parse %q: missing host`, input)
}
Expand Down
10 changes: 5 additions & 5 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,11 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
ro.PostProcessors = append(ro.PostProcessors, failClosedRatelimitPostProcessor)
}

if kubernetes.EnableMetadataRoute {
opts := kubernetes.MetadataPreProcessorOptions{EndpointRegistry: endpointRegistry}
ro.PreProcessors = append(ro.PreProcessors, kubernetes.NewMetadataPreProcessor(opts))
}

if o.DefaultFilters != nil {
ro.PreProcessors = append(ro.PreProcessors, o.DefaultFilters)
}
Expand All @@ -1948,11 +1953,6 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {
ro.PostProcessors = append(ro.PostProcessors, opaRegistry)
}

if kubernetes.EnableMetadataRoute {
opts := kubernetes.MetadataPostProcessorOptions{EndpointRegistry: endpointRegistry}
ro.PostProcessors = append(ro.PostProcessors, kubernetes.NewMetadataPostProcessor(opts))
}

if o.CustomRoutingPreProcessors != nil {
ro.PreProcessors = append(ro.PreProcessors, o.CustomRoutingPreProcessors...)
}
Expand Down

0 comments on commit 113fdb0

Please sign in to comment.