diff --git a/cmd/dlx/app/dlx.go b/cmd/dlx/app/dlx.go index 0a0da55..7ee4b84 100644 --- a/cmd/dlx/app/dlx.go +++ b/cmd/dlx/app/dlx.go @@ -18,7 +18,8 @@ func Run(kubeconfigPath string, targetPathHeader string, targetPort int, listenAddress string, - resourceReadinessTimeout string) error { + resourceReadinessTimeout string, + multiTargetStrategy string) error { pluginLoader, err := pluginloader.New() if err != nil { return errors.Wrap(err, "Failed to initialize plugin loader") @@ -41,6 +42,7 @@ func Run(kubeconfigPath string, ListenAddress: listenAddress, Namespace: namespace, ResourceReadinessTimeout: scalertypes.Duration{Duration: resourceReadinessTimeoutDuration}, + MultiTargetStrategy: scalertypes.MultiTargetStrategy(multiTargetStrategy), } // see if resource scaler wants to override the arguments diff --git a/cmd/dlx/main.go b/cmd/dlx/main.go index 0c9856e..c9eba96 100644 --- a/cmd/dlx/main.go +++ b/cmd/dlx/main.go @@ -18,6 +18,7 @@ func main() { targetPort := flag.Int("target-port", 0, "Name of the header that holds information on target port") listenAddress := flag.String("listen-address", ":8090", "Address to listen upon for http proxy") resourceReadinessTimeout := flag.String("resource-readiness-timeout", "5m", "maximum wait time for the resource to be ready") + multiTargetStrategy := flag.String("multi-target-strategy", "random", "Strategy for selecting to which target to send the request") flag.Parse() *namespace = common.GetNamespace(*namespace) @@ -28,7 +29,8 @@ func main() { *targetPathHeader, *targetPort, *listenAddress, - *resourceReadinessTimeout); err != nil { + *resourceReadinessTimeout, + *multiTargetStrategy); err != nil { errors.PrintErrorStack(os.Stderr, err, 5) os.Exit(1) diff --git a/pkg/dlx/dlx.go b/pkg/dlx/dlx.go index 64f91f4..8fea5ec 100644 --- a/pkg/dlx/dlx.go +++ b/pkg/dlx/dlx.go @@ -34,7 +34,8 @@ func NewDLX(parentLogger logger.Logger, resourceScaler, options.TargetNameHeader, options.TargetPathHeader, - options.TargetPort) + options.TargetPort, + options.MultiTargetStrategy) if err != nil { return nil, errors.Wrap(err, "Failed to create handler") } diff --git a/pkg/dlx/handler.go b/pkg/dlx/handler.go index 7ab7d95..6ee8753 100644 --- a/pkg/dlx/handler.go +++ b/pkg/dlx/handler.go @@ -2,9 +2,12 @@ package dlx import ( "fmt" + "math/rand" "net/http" "net/http/httputil" "net/url" + "strings" + "time" "github.com/v3io/scaler/pkg/scalertypes" @@ -13,13 +16,14 @@ import ( ) type Handler struct { - logger logger.Logger - HandleFunc func(http.ResponseWriter, *http.Request) - resourceStarter *ResourceStarter - resourceScaler scalertypes.ResourceScaler - targetNameHeader string - targetPathHeader string - targetPort int + logger logger.Logger + HandleFunc func(http.ResponseWriter, *http.Request) + resourceStarter *ResourceStarter + resourceScaler scalertypes.ResourceScaler + targetNameHeader string + targetPathHeader string + targetPort int + multiTargetStrategy scalertypes.MultiTargetStrategy } func NewHandler(parentLogger logger.Logger, @@ -27,23 +31,23 @@ func NewHandler(parentLogger logger.Logger, resourceScaler scalertypes.ResourceScaler, targetNameHeader string, targetPathHeader string, - targetPort int) (Handler, error) { + targetPort int, + multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) { h := Handler{ - logger: parentLogger.GetChild("handler"), - resourceStarter: resourceStarter, - resourceScaler: resourceScaler, - targetNameHeader: targetNameHeader, - targetPathHeader: targetPathHeader, - targetPort: targetPort, + logger: parentLogger.GetChild("handler"), + resourceStarter: resourceStarter, + resourceScaler: resourceScaler, + targetNameHeader: targetNameHeader, + targetPathHeader: targetPathHeader, + targetPort: targetPort, + multiTargetStrategy: multiTargetStrategy, } h.HandleFunc = h.handleRequest return h, nil } func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) { - var targetURL *url.URL - var err error - var resourceName string + var resourceNames []string responseChannel := make(chan ResourceStatusResult, 1) defer close(responseChannel) @@ -52,54 +56,120 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) { forwardedHost := req.Header.Get("X-Forwarded-Host") forwardedPort := req.Header.Get("X-Forwarded-Port") originalURI := req.Header.Get("X-Original-Uri") - resourceName = req.Header.Get("X-Resource-Name") + resourceName := req.Header.Get("X-Resource-Name") + + resourceTargetURLMap := map[string]*url.URL{} if forwardedHost != "" && forwardedPort != "" && resourceName != "" { - targetURL, err = url.Parse(fmt.Sprintf("http://%s:%s/%s", forwardedHost, forwardedPort, originalURI)) + targetURL, err := url.Parse(fmt.Sprintf("http://%s:%s/%s", forwardedHost, forwardedPort, originalURI)) if err != nil { res.WriteHeader(h.URLBadParse(resourceName, err)) return } + resourceNames = append(resourceNames, resourceName) + resourceTargetURLMap[resourceName] = targetURL } else { - resourceName = req.Header.Get(h.targetNameHeader) + targetNameHeaderValue := req.Header.Get(h.targetNameHeader) path := req.Header.Get(h.targetPathHeader) - if resourceName == "" { + if targetNameHeaderValue == "" { h.logger.WarnWith("When ingress not set, must pass header value", "missingHeader", h.targetNameHeader) res.WriteHeader(http.StatusBadRequest) return } - h.logger.DebugWith("Resolving service name", "resourceName", resourceName) - serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName}) - if err != nil { - h.logger.WarnWith("Failed resolving service name", - "err", errors.GetErrorStackString(err, 10)) - res.WriteHeader(http.StatusInternalServerError) - return - } - targetURL, err = url.Parse(fmt.Sprintf("http://%s:%d/%s", serviceName, h.targetPort, path)) - if err != nil { - res.WriteHeader(h.URLBadParse(resourceName, err)) - return + resourceNames = strings.Split(targetNameHeaderValue, ",") + for _, resourceName := range resourceNames { + targetURL, status := h.parseTargetURL(resourceName, path) + if targetURL == nil { + res.WriteHeader(status) + return + } + + resourceTargetURLMap[resourceName] = targetURL } } - h.resourceStarter.handleResourceStart(resourceName, responseChannel) - statusResult := <-responseChannel + statusResult := h.startResources(resourceNames) - if statusResult.Error != nil { - h.logger.WarnWith("Failed to forward request to resource", - "resource", statusResult.ResourceName, - "err", errors.GetErrorStackString(statusResult.Error, 10)) + if statusResult != nil && statusResult.Error != nil { res.WriteHeader(statusResult.Status) return } + targetURL, err := h.selectTargetURL(resourceNames, resourceTargetURLMap) + if err != nil { + res.WriteHeader(http.StatusInternalServerError) + return + } + h.logger.DebugWith("Creating reverse proxy", "targetURL", targetURL) proxy := httputil.NewSingleHostReverseProxy(targetURL) proxy.ServeHTTP(res, req) } +func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) { + h.logger.DebugWith("Resolving service name", "resourceName", resourceName) + serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName}) + if err != nil { + h.logger.WarnWith("Failed resolving service name", + "err", errors.GetErrorStackString(err, 10)) + return nil, http.StatusInternalServerError + } + targetURL, err := url.Parse(fmt.Sprintf("http://%s:%d/%s", serviceName, h.targetPort, path)) + if err != nil { + return nil, h.URLBadParse(resourceName, err) + } + return targetURL, 0 +} + +func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult { + responseChannel := make(chan ResourceStatusResult, len(resourceNames)) + defer close(responseChannel) + + // Start all resources in separate go routines + for _, resourceName := range resourceNames { + h.resourceStarter.handleResourceStart(resourceName, responseChannel) + } + + // Wait for all resources to finish starting + for range resourceNames { + statusResult := <-responseChannel + + if statusResult.Error != nil { + h.logger.WarnWith("Failed to start resource", + "resource", statusResult.ResourceName, + "err", errors.GetErrorStackString(statusResult.Error, 10)) + return &statusResult + } + } + + return nil +} + +func (h *Handler) selectTargetURL(resourceNames []string, resourceTargetURLMap map[string]*url.URL) (*url.URL, error) { + if len(resourceNames) == 1 { + return resourceTargetURLMap[resourceNames[0]], nil + } else if len(resourceNames) != 2 { + h.logger.WarnWith("Unsupported amount of targets", + "targetsAmount", len(resourceNames)) + return nil, errors.Errorf("Unsupported amount of targets: %d", len(resourceNames)) + } + + switch h.multiTargetStrategy { + case scalertypes.MultiTargetStrategyRandom: + rand.Seed(time.Now().Unix()) + return resourceTargetURLMap[resourceNames[rand.Intn(len(resourceNames))]], nil + case scalertypes.MultiTargetStrategyPrimary: + return resourceTargetURLMap[resourceNames[0]], nil + case scalertypes.MultiTargetStrategyCanary: + return resourceTargetURLMap[resourceNames[1]], nil + default: + h.logger.WarnWith("Unsupported multi target strategy", + "strategy", h.multiTargetStrategy) + return nil, errors.Errorf("Unsupported multi target strategy: %s", h.multiTargetStrategy) + } +} + func (h *Handler) URLBadParse(resourceName string, err error) int { h.logger.Warn("Failed to parse url for resource", "resourceName", resourceName, diff --git a/pkg/scalertypes/types.go b/pkg/scalertypes/types.go index 1b58ad9..9012a99 100644 --- a/pkg/scalertypes/types.go +++ b/pkg/scalertypes/types.go @@ -22,6 +22,14 @@ type ResourceScalerConfig struct { DLXOptions DLXOptions } +type MultiTargetStrategy string + +const ( + MultiTargetStrategyRandom MultiTargetStrategy = "random" + MultiTargetStrategyPrimary MultiTargetStrategy = "primary" + MultiTargetStrategyCanary MultiTargetStrategy = "canary" +) + type DLXOptions struct { Namespace string @@ -31,6 +39,7 @@ type DLXOptions struct { TargetPort int ListenAddress string ResourceReadinessTimeout Duration + MultiTargetStrategy MultiTargetStrategy } type ResourceScaler interface {