diff --git a/go.mod b/go.mod index 82df00cf902..c110159c170 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.6.1 + github.com/tidwall/gjson v1.6.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c google.golang.org/api v0.29.0 google.golang.org/genproto v0.0.0-20200731012542-8145dea6a485 diff --git a/go.sum b/go.sum index c099dd2cae9..1f4fdadc024 100644 --- a/go.sum +++ b/go.sum @@ -1127,7 +1127,13 @@ github.com/tektoncd/plumbing v0.0.0-20200217163359-cd0db6e567d2/go.mod h1:QZHgU0 github.com/tektoncd/plumbing/pipelinerun-logs v0.0.0-20191206114338-712d544c2c21/go.mod h1:S62EUWtqmejjJgUMOGB1CCCHRp6C706laH06BoALkzU= github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= +github.com/tidwall/gjson v1.6.1 h1:LRbvNuNuvAiISWg6gxLEFuCe72UKy5hDqhxW/8183ws= +github.com/tidwall/gjson v1.6.1/go.mod h1:BaHyNc5bjzYkPqgLq7mdVzeiRtULKULXLgZFKsxEHI0= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.0.2 h1:Z7S3cePv9Jwm1KwS0513MRaoUe3S01WPbLNV40pwWZU= +github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/timakin/bodyclose v0.0.0-20190930140734-f7f2e9bca95e/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94/go.mod h1:Qimiffbc6q9tBWlVV6x0P9sat/ao1xEkREYPPj9hphk= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go new file mode 100644 index 00000000000..439dbfbb3c7 --- /dev/null +++ b/pkg/scalers/metrics_api_scaler.go @@ -0,0 +1,145 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + kedautil "github.com/kedacore/keda/pkg/util" + "github.com/tidwall/gjson" + "io/ioutil" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + "net/http" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" +) + +type metricsAPIScaler struct { + metadata *metricsAPIScalerMetadata +} + +type metricsAPIScalerMetadata struct { + targetValue int + url string + valueLocation string +} + +var httpLog = logf.Log.WithName("metrics_api_scaler") + +// NewMetricsAPIScaler creates a new HTTP scaler +func NewMetricsAPIScaler(resolvedEnv, metadata, authParams map[string]string) (Scaler, error) { + meta, err := metricsAPIMetadata(resolvedEnv, metadata, authParams) + if err != nil { + return nil, fmt.Errorf("error parsing metric API metadata: %s", err) + } + return &metricsAPIScaler{metadata: meta}, nil +} + +func metricsAPIMetadata(resolvedEnv, metadata, authParams map[string]string) (*metricsAPIScalerMetadata, error) { + meta := metricsAPIScalerMetadata{} + + if val, ok := metadata["targetValue"]; ok { + targetValue, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) + } + meta.targetValue = targetValue + } else { + return nil, fmt.Errorf("no targetValue given in metadata") + } + + if val, ok := metadata["url"]; ok { + meta.url = val + } else { + return nil, fmt.Errorf("no url given in metadata") + } + + if val, ok := metadata["valueLocation"]; ok { + meta.valueLocation = val + } else { + return nil, fmt.Errorf("no valueLocation given in metadata") + } + + return &meta, nil +} + +// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body +func GetValueFromResponse(body []byte, valueLocation string) (int64, error) { + r := gjson.GetBytes(body, valueLocation) + if r.Type != gjson.Number { + msg := fmt.Sprintf("valueLocation must point to value of type number got: %s", r.Type.String()) + return 0, errors.New(msg) + } + return int64(r.Num), nil +} + +func (s *metricsAPIScaler) getMetricValue() (int64, error) { + r, err := http.Get(s.metadata.url) + if err != nil { + return 0, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return 0, err + } + v, err := GetValueFromResponse(b, s.metadata.valueLocation) + if err != nil { + return 0, err + } + return v, nil +} + +// Close does nothing in case of metricsAPIScaler +func (s *metricsAPIScaler) Close() error { + return nil +} + +// IsActive returns true if there are pending messages to be processed +func (s *metricsAPIScaler) IsActive(ctx context.Context) (bool, error) { + v, err := s.getMetricValue() + if err != nil { + httpLog.Error(err, fmt.Sprintf("Error when checking metric value: %s", err)) + return false, err + } + + return v > 0.0, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *metricsAPIScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) + metricName := fmt.Sprintf("%s-%s-%s", "http", kedautil.NormalizeString(s.metadata.url), s.metadata.valueLocation) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *metricsAPIScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + v, err := s.getMetricValue() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error requesting metrics endpoint: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(v, resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} diff --git a/pkg/scalers/metrics_api_scaler_test.go b/pkg/scalers/metrics_api_scaler_test.go new file mode 100644 index 00000000000..c9a736ae3ab --- /dev/null +++ b/pkg/scalers/metrics_api_scaler_test.go @@ -0,0 +1,60 @@ +package scalers + +import ( + "testing" +) + +var metricsAPIResolvedEnv = map[string]string{} +var authParams = map[string]string{} + +type metricsAPIMetadataTestData struct { + metadata map[string]string + raisesError bool +} + +var testMetricsAPIMetadata = []metricsAPIMetadataTestData{ + // No metadata + {metadata: map[string]string{}, raisesError: true}, + // OK + {metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42"}, raisesError: false}, + // Target not an int + {metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "aa"}, raisesError: true}, + // Missing metric name + {metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "targetValue": "aa"}, raisesError: true}, + // Missing url + {metadata: map[string]string{"valueLocation": "metric", "targetValue": "aa"}, raisesError: true}, + // Missing targetValue + {metadata: map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric"}, raisesError: true}, +} + +func TestParseMetricsAPIMetadata(t *testing.T) { + for _, testData := range testMetricsAPIMetadata { + _, err := metricsAPIMetadata(metricsAPIResolvedEnv, testData.metadata, authParams) + if err != nil && !testData.raisesError { + t.Error("Expected success but got error", err) + } + if err == nil && testData.raisesError { + t.Error("Expected error but got success") + } + } +} + +func TestGetValueFromResponse(t *testing.T) { + d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32}],"count":2.43}`) + v, err := GetValueFromResponse(d, "components.0.tasks") + if err != nil { + t.Error("Expected success but got error", err) + } + if v != 32 { + t.Errorf("Expected %d got %d", 32, v) + } + + v, err = GetValueFromResponse(d, "count") + if err != nil { + t.Error("Expected success but got error", err) + } + if v != 2 { + t.Errorf("Expected %d got %d", 2, v) + } + +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 9bf0b3c7fa8..ca053eb9f42 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -398,6 +398,8 @@ func buildScaler(name, namespace, triggerType string, resolvedEnv, triggerMetada return scalers.NewKafkaScaler(resolvedEnv, triggerMetadata, authParams) case "liiklus": return scalers.NewLiiklusScaler(resolvedEnv, triggerMetadata) + case "metrics-api": + return scalers.NewMetricsAPIScaler(resolvedEnv, triggerMetadata, authParams) case "mysql": return scalers.NewMySQLScaler(resolvedEnv, triggerMetadata, authParams) case "postgresql":