Skip to content

Commit

Permalink
[DLX] Multi Target wake and proxy (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
quaark authored Jul 22, 2021
1 parent 3491879 commit 6cafa77
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 42 deletions.
4 changes: 3 additions & 1 deletion cmd/dlx/app/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func Run(kubeconfigPath string,
targetPathHeader string,
targetPort int,
listenAddress string,
resourceReadinessTimeout string) error {
resourceReadinessTimeout string,
multiTargetStrategy string) error {
pluginLoader, err := pluginloader.New()
if err != nil {
return errors.Wrap(err, "Failed to initialize plugin loader")
Expand All @@ -41,6 +42,7 @@ func Run(kubeconfigPath string,
ListenAddress: listenAddress,
Namespace: namespace,
ResourceReadinessTimeout: scalertypes.Duration{Duration: resourceReadinessTimeoutDuration},
MultiTargetStrategy: scalertypes.MultiTargetStrategy(multiTargetStrategy),
}

// see if resource scaler wants to override the arguments
Expand Down
4 changes: 3 additions & 1 deletion cmd/dlx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func main() {
targetPort := flag.Int("target-port", 0, "Name of the header that holds information on target port")
listenAddress := flag.String("listen-address", ":8090", "Address to listen upon for http proxy")
resourceReadinessTimeout := flag.String("resource-readiness-timeout", "5m", "maximum wait time for the resource to be ready")
multiTargetStrategy := flag.String("multi-target-strategy", "random", "Strategy for selecting to which target to send the request")
flag.Parse()

*namespace = common.GetNamespace(*namespace)
Expand All @@ -28,7 +29,8 @@ func main() {
*targetPathHeader,
*targetPort,
*listenAddress,
*resourceReadinessTimeout); err != nil {
*resourceReadinessTimeout,
*multiTargetStrategy); err != nil {
errors.PrintErrorStack(os.Stderr, err, 5)

os.Exit(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func NewDLX(parentLogger logger.Logger,
resourceScaler,
options.TargetNameHeader,
options.TargetPathHeader,
options.TargetPort)
options.TargetPort,
options.MultiTargetStrategy)
if err != nil {
return nil, errors.Wrap(err, "Failed to create handler")
}
Expand Down
148 changes: 109 additions & 39 deletions pkg/dlx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package dlx

import (
"fmt"
"math/rand"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"time"

"github.com/v3io/scaler/pkg/scalertypes"

Expand All @@ -13,37 +16,38 @@ import (
)

type Handler struct {
logger logger.Logger
HandleFunc func(http.ResponseWriter, *http.Request)
resourceStarter *ResourceStarter
resourceScaler scalertypes.ResourceScaler
targetNameHeader string
targetPathHeader string
targetPort int
logger logger.Logger
HandleFunc func(http.ResponseWriter, *http.Request)
resourceStarter *ResourceStarter
resourceScaler scalertypes.ResourceScaler
targetNameHeader string
targetPathHeader string
targetPort int
multiTargetStrategy scalertypes.MultiTargetStrategy
}

func NewHandler(parentLogger logger.Logger,
resourceStarter *ResourceStarter,
resourceScaler scalertypes.ResourceScaler,
targetNameHeader string,
targetPathHeader string,
targetPort int) (Handler, error) {
targetPort int,
multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) {
h := Handler{
logger: parentLogger.GetChild("handler"),
resourceStarter: resourceStarter,
resourceScaler: resourceScaler,
targetNameHeader: targetNameHeader,
targetPathHeader: targetPathHeader,
targetPort: targetPort,
logger: parentLogger.GetChild("handler"),
resourceStarter: resourceStarter,
resourceScaler: resourceScaler,
targetNameHeader: targetNameHeader,
targetPathHeader: targetPathHeader,
targetPort: targetPort,
multiTargetStrategy: multiTargetStrategy,
}
h.HandleFunc = h.handleRequest
return h, nil
}

func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
var targetURL *url.URL
var err error
var resourceName string
var resourceNames []string

responseChannel := make(chan ResourceStatusResult, 1)
defer close(responseChannel)
Expand All @@ -52,54 +56,120 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
forwardedHost := req.Header.Get("X-Forwarded-Host")
forwardedPort := req.Header.Get("X-Forwarded-Port")
originalURI := req.Header.Get("X-Original-Uri")
resourceName = req.Header.Get("X-Resource-Name")
resourceName := req.Header.Get("X-Resource-Name")

resourceTargetURLMap := map[string]*url.URL{}

if forwardedHost != "" && forwardedPort != "" && resourceName != "" {
targetURL, err = url.Parse(fmt.Sprintf("http://%s:%s/%s", forwardedHost, forwardedPort, originalURI))
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s/%s", forwardedHost, forwardedPort, originalURI))
if err != nil {
res.WriteHeader(h.URLBadParse(resourceName, err))
return
}
resourceNames = append(resourceNames, resourceName)
resourceTargetURLMap[resourceName] = targetURL
} else {
resourceName = req.Header.Get(h.targetNameHeader)
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
path := req.Header.Get(h.targetPathHeader)
if resourceName == "" {
if targetNameHeaderValue == "" {
h.logger.WarnWith("When ingress not set, must pass header value",
"missingHeader", h.targetNameHeader)
res.WriteHeader(http.StatusBadRequest)
return
}
h.logger.DebugWith("Resolving service name", "resourceName", resourceName)
serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName})
if err != nil {
h.logger.WarnWith("Failed resolving service name",
"err", errors.GetErrorStackString(err, 10))
res.WriteHeader(http.StatusInternalServerError)
return
}
targetURL, err = url.Parse(fmt.Sprintf("http://%s:%d/%s", serviceName, h.targetPort, path))
if err != nil {
res.WriteHeader(h.URLBadParse(resourceName, err))
return
resourceNames = strings.Split(targetNameHeaderValue, ",")
for _, resourceName := range resourceNames {
targetURL, status := h.parseTargetURL(resourceName, path)
if targetURL == nil {
res.WriteHeader(status)
return
}

resourceTargetURLMap[resourceName] = targetURL
}
}

h.resourceStarter.handleResourceStart(resourceName, responseChannel)
statusResult := <-responseChannel
statusResult := h.startResources(resourceNames)

if statusResult.Error != nil {
h.logger.WarnWith("Failed to forward request to resource",
"resource", statusResult.ResourceName,
"err", errors.GetErrorStackString(statusResult.Error, 10))
if statusResult != nil && statusResult.Error != nil {
res.WriteHeader(statusResult.Status)
return
}

targetURL, err := h.selectTargetURL(resourceNames, resourceTargetURLMap)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
return
}

h.logger.DebugWith("Creating reverse proxy", "targetURL", targetURL)
proxy := httputil.NewSingleHostReverseProxy(targetURL)
proxy.ServeHTTP(res, req)
}

func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
h.logger.DebugWith("Resolving service name", "resourceName", resourceName)
serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName})
if err != nil {
h.logger.WarnWith("Failed resolving service name",
"err", errors.GetErrorStackString(err, 10))
return nil, http.StatusInternalServerError
}
targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d/%s", serviceName, h.targetPort, path))
if err != nil {
return nil, h.URLBadParse(resourceName, err)
}
return targetURL, 0
}

func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult {
responseChannel := make(chan ResourceStatusResult, len(resourceNames))
defer close(responseChannel)

// Start all resources in separate go routines
for _, resourceName := range resourceNames {
h.resourceStarter.handleResourceStart(resourceName, responseChannel)
}

// Wait for all resources to finish starting
for range resourceNames {
statusResult := <-responseChannel

if statusResult.Error != nil {
h.logger.WarnWith("Failed to start resource",
"resource", statusResult.ResourceName,
"err", errors.GetErrorStackString(statusResult.Error, 10))
return &statusResult
}
}

return nil
}

func (h *Handler) selectTargetURL(resourceNames []string, resourceTargetURLMap map[string]*url.URL) (*url.URL, error) {
if len(resourceNames) == 1 {
return resourceTargetURLMap[resourceNames[0]], nil
} else if len(resourceNames) != 2 {
h.logger.WarnWith("Unsupported amount of targets",
"targetsAmount", len(resourceNames))
return nil, errors.Errorf("Unsupported amount of targets: %d", len(resourceNames))
}

switch h.multiTargetStrategy {
case scalertypes.MultiTargetStrategyRandom:
rand.Seed(time.Now().Unix())
return resourceTargetURLMap[resourceNames[rand.Intn(len(resourceNames))]], nil
case scalertypes.MultiTargetStrategyPrimary:
return resourceTargetURLMap[resourceNames[0]], nil
case scalertypes.MultiTargetStrategyCanary:
return resourceTargetURLMap[resourceNames[1]], nil
default:
h.logger.WarnWith("Unsupported multi target strategy",
"strategy", h.multiTargetStrategy)
return nil, errors.Errorf("Unsupported multi target strategy: %s", h.multiTargetStrategy)
}
}

func (h *Handler) URLBadParse(resourceName string, err error) int {
h.logger.Warn("Failed to parse url for resource",
"resourceName", resourceName,
Expand Down
9 changes: 9 additions & 0 deletions pkg/scalertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ type ResourceScalerConfig struct {
DLXOptions DLXOptions
}

type MultiTargetStrategy string

const (
MultiTargetStrategyRandom MultiTargetStrategy = "random"
MultiTargetStrategyPrimary MultiTargetStrategy = "primary"
MultiTargetStrategyCanary MultiTargetStrategy = "canary"
)

type DLXOptions struct {
Namespace string

Expand All @@ -31,6 +39,7 @@ type DLXOptions struct {
TargetPort int
ListenAddress string
ResourceReadinessTimeout Duration
MultiTargetStrategy MultiTargetStrategy
}

type ResourceScaler interface {
Expand Down

0 comments on commit 6cafa77

Please sign in to comment.