Skip to content
This repository has been archived by the owner on Jan 22, 2021. It is now read-only.

metrics: add support for more metric types #74

Merged
merged 8 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions hack/e2e-scripts/configure-queue-metrics.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ cp deploy/externalmetric.yaml deploy/externalmetric.yaml.copy
sed -i 's|sb-external-ns|'${SERVICEBUS_NAMESPACE}'|g' deploy/externalmetric.yaml
sed -i 's|sb-external-example|'${SERVICEBUS_RESOURCE_GROUP}'|g' deploy/externalmetric.yaml
sed -i 's|externalq|'${SERVICEBUS_QUEUE_NAME}'|g' deploy/externalmetric.yaml
kubectl apply -f deploy/externalmetric.yaml

rm deploy/externalmetric.yaml; mv deploy/externalmetric.yaml.copy deploy/externalmetric.yaml
AGGREGATE_TYPE=( "average" "maximum" "minimum" "total" )
# supported aggregates https://github.com/Azure/azure-sdk-for-go/blob/0acfc1d1083d148a606d380143176e218d437728/services/preview/monitor/mgmt/2018-03-01/insights/models.go#L38
for AGGREGATE in "${AGGREGATE_TYPE[@]}"
do
filePath="deploy/${AGGREGATE}.externalmetric.yaml"
echo "Creating ${AGGREGATE} external metric with file: $filePath"
cp deploy/externalmetric.yaml $filePath

# give a name to the external metric
sed -i 's|queuemessages|queuemessages-'${AGGREGATE}'|g' $filePath

# specify the aggregate type
sed -i 's|Total|'${AGGREGATE}'|g' $filePath

kubectl apply -f $filePath
done

rm deploy/*externalmetric.yaml; mv deploy/externalmetric.yaml.copy deploy/externalmetric.yaml
22 changes: 14 additions & 8 deletions hack/e2e-scripts/gen-and-check-queue-messages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@ echo; echo "Sending $NUM messages..."

echo; echo "Checking metrics endpoint for 4 minutes..."

MSGCOUNT=$(kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/default/queuemessages" | jq .items[0].value)
MSGCOUNT=$(kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/default/queuemessages-total" | jq .items[0].value)
START=`date +%s`

while [[ ! "$MSGCOUNT" = "\"$NUM\"" ]] && [[ $(( $(date +%s) - 225 )) -lt $START ]]; do
sleep 15
MSGCOUNT=$(kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/default/queuemessages" | jq .items[0].value)
MSGCOUNT=$(kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/default/queuemessages-total" | jq .items[0].value)
echo "Endpoint returned $MSGCOUNT messages"
done

if [[ ! "$MSGCOUNT" = "\"$NUM\"" ]]; then
echo "Timed out, message count ($MSGCOUNT) not equal to number of messages sent ($NUM)"
exit 1
else
echo "Message count equal to number of messages sent, metrics adapter working correctly"
fi
AGGREGATE_TYPE=( "average" "maximum" "minimum" "total" )
for AGGREGATE in "${AGGREGATE_TYPE[@]}"
do
METRIC_NAME="queuemessages-${AGGREGATE}"
VALUE=$(kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/default/${METRIC_NAME}" | jq .items[0].value)
if [[ ! "$VALUE" = "\"$NUM\"" ]]; then
akshaysngupta marked this conversation as resolved.
Show resolved Hide resolved
echo "Timed out, message aggregate type: ${AGGREGATE} value: ${VALUE} not equal to number of messages sent ($NUM)"
exit 1
else
echo "message aggregate type: ${AGGREGATE} value: ${VALUE} is equal to number of messages sent ($NUM), metrics adapter working correctly"
fi
done
2 changes: 1 addition & 1 deletion pkg/azure/externalmetrics/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package externalmetrics

type AzureExternalMetricResponse struct {
Total float64
Value float64
}

type AzureExternalMetricClient interface {
Expand Down
61 changes: 48 additions & 13 deletions pkg/azure/externalmetrics/monitor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package externalmetrics

import (
"context"
"fmt"
"strings"

"github.com/Azure/azure-sdk-for-go/services/preview/monitor/mgmt/2018-03-01/insights"
"github.com/Azure/go-autorest/autorest/azure/auth"
Expand Down Expand Up @@ -55,23 +57,56 @@ func (c *monitorClient) GetAzureMetric(azMetricRequest AzureExternalMetricReques
return AzureExternalMetricResponse{}, err
}

total := extractValue(metricResult)
value, err := extractValue(azMetricRequest, metricResult)

klog.V(2).Infof("found metric value: %f", total)

// TODO set Value based on aggregations type
return AzureExternalMetricResponse{
Total: total,
}, nil
Value: value,
}, err
}

func extractValue(metricResult insights.Response) float64 {
//TODO extract value based on aggregation type
//TODO check for nils
akshaysngupta marked this conversation as resolved.
Show resolved Hide resolved
func extractValue(azMetricRequest AzureExternalMetricRequest, metricResult insights.Response) (float64, error) {
metricVals := *metricResult.Value
Timeseries := *metricVals[0].Timeseries
data := *Timeseries[0].Data
total := *data[len(data)-1].Total
akshaysngupta marked this conversation as resolved.
Show resolved Hide resolved

return total
if len(metricVals) == 0 {
err := fmt.Errorf("Got an empty response for metric %s/%s and aggregate type %s", azMetricRequest.Namespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return 0, err
}

timeseries := *metricVals[0].Timeseries
if timeseries == nil {
err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without timeseries", azMetricRequest.Namespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return 0, err
}

data := *timeseries[0].Data
if data == nil {
err := fmt.Errorf("Got metric result for %s/%s and aggregate type %s without any metric values", azMetricRequest.Namespace, azMetricRequest.MetricName, insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)))
return 0, err
}

var valuePtr *float64
if strings.EqualFold(string(insights.Average), azMetricRequest.Aggregation) && data[len(data)-1].Average != nil {
akshaysngupta marked this conversation as resolved.
Show resolved Hide resolved
valuePtr = data[len(data)-1].Average
} else if strings.EqualFold(string(insights.Total), azMetricRequest.Aggregation) && data[len(data)-1].Total != nil {
valuePtr = data[len(data)-1].Total
} else if strings.EqualFold(string(insights.Maximum), azMetricRequest.Aggregation) && data[len(data)-1].Maximum != nil {
valuePtr = data[len(data)-1].Maximum
} else if strings.EqualFold(string(insights.Minimum), azMetricRequest.Aggregation) && data[len(data)-1].Minimum != nil {
valuePtr = data[len(data)-1].Minimum
} else if strings.EqualFold(string(insights.Count), azMetricRequest.Aggregation) && data[len(data)-1].Count != nil {
fValue := float64(*data[len(data)-1].Count)
valuePtr = &fValue
} else {
err := fmt.Errorf("Unsupported aggregation type %s specified in metric %s/%s", insights.AggregationType(strings.ToTitle(azMetricRequest.Aggregation)), azMetricRequest.Namespace, azMetricRequest.MetricName)
return 0, err
}

if valuePtr == nil {
err := fmt.Errorf("Unable to get value for metric %s/%s with aggregation %s. No value returned by the Azure Monitor", azMetricRequest.Namespace, azMetricRequest.MetricName, azMetricRequest.Aggregation)
return 0, err
}

klog.V(2).Infof("metric type: %s %f", azMetricRequest.Aggregation, *valuePtr)

return *valuePtr, nil
}
52 changes: 36 additions & 16 deletions pkg/azure/externalmetrics/monitor_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestAzureMonitorIfFailedResponseGetError(t *testing.T) {

client := newMonitorClient("", monitorClient)

request := newAzureMonitorMetricRequest()
request := newAzureMonitorMetricRequest(insights.Average)
_, err := client.GetAzureMetric(request)

if err == nil {
Expand All @@ -44,28 +44,46 @@ func TestAzureMonitorIfFailedResponseGetError(t *testing.T) {
}

func TestAzureMonitorIfValidRequestGetResult(t *testing.T) {
response := makeAzureMonitorResponse(15)
monitorClient := newFakeMonitorClient(response, nil)
aggregateList := []insights.AggregationType{
insights.Average,
insights.Minimum,
insights.Maximum,
insights.Total,
}

client := newMonitorClient("", monitorClient)
for _, agg := range aggregateList {
response := makeAzureMonitorResponse(agg, 15)
monitorClient := newFakeMonitorClient(response, nil)

request := newAzureMonitorMetricRequest()
metricResponse, err := client.GetAzureMetric(request)
client := newMonitorClient("", monitorClient)

if err != nil {
t.Errorf("error after processing got: %v, want nil", err)
}
request := newAzureMonitorMetricRequest(agg)
metricResponse, err := client.GetAzureMetric(request)

if metricResponse.Total != 15 {
t.Errorf("metricResponse.Total = %v, want = %v", metricResponse.Total, 15)
if err != nil {
t.Errorf("error after processing got: %v, want nil", err)
}

if metricResponse.Value != 15 {
t.Errorf("metricresponse.Value = %v, want = %v", metricResponse.Value, 15)
}
}
}

func makeAzureMonitorResponse(value float64) insights.Response {
func makeAzureMonitorResponse(aggregateType insights.AggregationType, value float64) insights.Response {
// create metric value
mv := insights.MetricValue{
Total: &value,
mv := insights.MetricValue{}
switch aggregateType {
case insights.Average:
mv.Average = &value
case insights.Minimum:
mv.Minimum = &value
case insights.Maximum:
mv.Maximum = &value
case insights.Total:
mv.Total = &value
}

metricValues := []insights.MetricValue{}
metricValues = append(metricValues, mv)

Expand All @@ -77,8 +95,10 @@ func makeAzureMonitorResponse(value float64) insights.Response {
timeseries = append(timeseries, te)

// create metric
aType := string(aggregateType)
metric := insights.Metric{
Timeseries: &timeseries,
Type: &aType,
}
metrics := []insights.Metric{}
metrics = append(metrics, metric)
Expand All @@ -90,7 +110,7 @@ func makeAzureMonitorResponse(value float64) insights.Response {
return response
}

func newAzureMonitorMetricRequest() AzureExternalMetricRequest {
func newAzureMonitorMetricRequest(aggregationType insights.AggregationType) AzureExternalMetricRequest {
return AzureExternalMetricRequest{
ResourceGroup: "ResourceGroup",
ResourceName: "ResourceName",
Expand All @@ -99,7 +119,7 @@ func newAzureMonitorMetricRequest() AzureExternalMetricRequest {
SubscriptionID: "SubscriptionID",
MetricName: "MetricName",
Filter: "Filter",
Aggregation: "Aggregation",
Aggregation: string(aggregationType),
Timespan: "PT10",
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ func (c *servicebusClient) GetAzureMetric(azMetricRequest AzureExternalMetricReq

// TODO set Value based on aggregations type
return AzureExternalMetricResponse{
Total: activeMessageCount,
Value: activeMessageCount,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestIfValidRequestGetResult(t *testing.T) {
t.Errorf("error after processing got: %v, want nil", err)
}

if metricResponse.Total != 15 {
t.Errorf("metricResponse.Total = %v, want = %v", metricResponse.Total, 15)
if metricResponse.Value != 15 {
t.Errorf("metricResponse.Total = %v, want = %v", metricResponse.Value, 15)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider_external.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (p *AzureProvider) GetExternalMetric(namespace string, metricSelector label

externalmetric := external_metrics.ExternalMetricValue{
MetricName: info.Metric,
Value: *resource.NewQuantity(int64(metricValue.Total), resource.DecimalSI),
Value: *resource.NewQuantity(int64(metricValue.Value), resource.DecimalSI),
Timestamp: metav1.Now(),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type fakeAzureExternalClientFactory struct {
func (f fakeAzureExternalClientFactory) GetAzureExternalMetricClient(clientType string) (client externalmetrics.AzureExternalMetricClient, err error) {
fakeClient := fakeAzureMonitorClient{
err: nil,
result: externalmetrics.AzureExternalMetricResponse{Total: 15},
result: externalmetrics.AzureExternalMetricResponse{Value: 15},
}

return fakeClient, nil
Expand Down