Skip to content

Commit

Permalink
Implement FetchAllHostsMetrics for SignalFx using new API (#25)
Browse files Browse the repository at this point in the history
* Implement FetchAllHostsMetrics for SignalFx using new API

Signed-off-by: Abdul Qadeer <aqadeer@paypal.com>

* Address review comments
Fix bug in result set size
Average returned values for metrics per window size

Signed-off-by: Abdul Qadeer <aqadeer@paypal.com>

* Address review comments
  • Loading branch information
Abdul Qadeer authored Mar 3, 2021
1 parent 5651e2f commit 6c7e96e
Showing 1 changed file with 241 additions and 25 deletions.
266 changes: 241 additions & 25 deletions pkg/watcher/internal/metricsprovider/signalfx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
Expand All @@ -33,17 +34,17 @@ import (

const (
// SignalFX Request Params
DefaultSignalFxAddress = "https://api.signalfx.com"
signalFxMetricsAPI = "/v1/timeserieswindow"
// SignalFx adds a suffix to hostnames if configured
signalFxHostNameSuffix = ".group.region.gcp.com"
signalFxHostFilter = "host:"

DefaultSignalFxAddress = "https://api.signalfx.com"
signalFxMetricsAPI = "/v1/timeserieswindow"
signalFxMetdataAPI = "/v2/metrictimeseries"
signalFxHostFilter = "host:"
signalFxHostNameSuffixKey = "SIGNALFX_HOST_NAME_SUFFIX"
// SignalFX Query Params
oneMinuteResolutionMs = 60000
cpuUtilizationMetric = `sf_metric:"cpu.utilization"`
memoryUtilizationMetric = `sf_metric:"memory.utilization"`
AND = "AND"
resultSetLimit = "10000"

// Miscellaneous
httpClientTimeout = 55 * time.Second
Expand All @@ -53,6 +54,7 @@ type signalFxClient struct {
client http.Client
authToken string
signalFxAddress string
hostNameSuffix string
}

func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProviderClient, error) {
Expand All @@ -62,7 +64,7 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide
tlsConfig := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // TODO(aqadeer): Figure out a secure way to let users add SSL certs
}

hostNameSuffix, _ := os.LookupEnv(signalFxHostNameSuffixKey)
var signalFxAddress, signalFxAuthToken = DefaultSignalFxAddress, ""
if opts.Address != "" {
signalFxAddress = opts.Address
Expand All @@ -77,7 +79,8 @@ func NewSignalFxClient(opts watcher.MetricsProviderOpts) (watcher.MetricsProvide
Timeout: httpClientTimeout,
Transport: tlsConfig},
authToken: signalFxAuthToken,
signalFxAddress: signalFxAddress}, nil
signalFxAddress: signalFxAddress,
hostNameSuffix: hostNameSuffix}, nil
}

func (s signalFxClient) Name() string {
Expand All @@ -87,7 +90,7 @@ func (s signalFxClient) Name() string {
func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([]watcher.Metric, error) {
log.Debugf("fetching metrics for host %v", host)
var metrics []watcher.Metric
hostQuery := signalFxHostFilter + host + signalFxHostNameSuffix
hostQuery := signalFxHostFilter + host + s.hostNameSuffix

for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} {
uri, err := s.buildMetricURL(hostQuery, metric, window)
Expand All @@ -113,16 +116,7 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([
}

var fetchedMetric watcher.Metric
// Added default operator and rollup for signalfx client.
fetchedMetric.Operator = watcher.Average
fetchedMetric.Rollup = window.Duration
if metric == cpuUtilizationMetric {
fetchedMetric.Name = cpuUtilizationMetric
fetchedMetric.Type = watcher.CPU
} else {
fetchedMetric.Name = memoryUtilizationMetric
fetchedMetric.Type = watcher.Memory
}
addMetadata(&fetchedMetric, metric)
fetchedMetric.Value, err = decodeMetricsPayload(res)
if err != nil {
return metrics, err
Expand All @@ -133,9 +127,74 @@ func (s signalFxClient) FetchHostMetrics(host string, window *watcher.Window) ([
return metrics, nil
}

// TODO(aqadeer): Fetching metrics for all hosts is not possible currently via timeserieswindow SignalFx API
func (s signalFxClient) FetchAllHostsMetrics(*watcher.Window) (map[string][]watcher.Metric, error) {
return nil, errors.New("This function is not yet implemented")
func (s signalFxClient) FetchAllHostsMetrics(window *watcher.Window) (map[string][]watcher.Metric, error) {
hostQuery := signalFxHostFilter + "*" + s.hostNameSuffix
metrics := make(map[string][]watcher.Metric)
for _, metric := range []string{cpuUtilizationMetric, memoryUtilizationMetric} {
uri, err := s.buildMetricURL(hostQuery, metric, window)
if err != nil {
return metrics, err
}
req, _ := http.NewRequest(http.MethodGet, uri.String(), nil)
req.Header.Set("X-SF-Token", s.authToken)
req.Header.Set("Content-Type", "application/json")

metricResp, err := s.client.Do(req)
if err != nil {
return metrics, err
}
defer metricResp.Body.Close()
if metricResp.StatusCode != http.StatusOK {
return metrics, fmt.Errorf("received status code: %v", metricResp.StatusCode)
}
var metricPayload interface{}
err = json.NewDecoder(metricResp.Body).Decode(&metricPayload)
if err != nil {
return metrics, err
}

uri, err = s.buildMetadataURL(hostQuery, metric)
if err != nil {
return metrics, err
}
req, _ = http.NewRequest(http.MethodGet, uri.String(), nil)
req.Header.Set("X-SF-Token", s.authToken)
req.Header.Set("Content-Type", "application/json")

metadataResp, err := s.client.Do(req)
if err != nil {
return metrics, err
}
defer metadataResp.Body.Close()
if metadataResp.StatusCode != http.StatusOK {
return metrics, fmt.Errorf("received status code: %v", metadataResp.StatusCode)
}
var metadataPayload interface{}
err = json.NewDecoder(metadataResp.Body).Decode(&metadataPayload)
if err != nil {
return metrics, err
}
mappedMetrics, err := getMetricsFromPayloads(metricPayload, metadataPayload)
if err != nil {
return metrics, err
}
for k, v := range mappedMetrics {
addMetadata(&v, metric)
metrics[k] = append(metrics[k], v)
}
}
return metrics, nil
}

func addMetadata(metric *watcher.Metric, metricType string) {
metric.Operator = watcher.Average
if metricType == cpuUtilizationMetric {
metric.Name = cpuUtilizationMetric
metric.Type = watcher.CPU
} else {
metric.Name = memoryUtilizationMetric
metric.Type = watcher.Memory
}
}

func (s signalFxClient) buildMetricURL(host string, metric string, window *watcher.Window) (uri *url.URL, err error) {
Expand All @@ -150,14 +209,30 @@ func (s signalFxClient) buildMetricURL(host string, metric string, window *watch
builder.WriteString(fmt.Sprintf(" %v ", AND))
builder.WriteString(metric)
q.Set("query", builder.String())

q.Set("startMs", strconv.FormatInt(window.Start, 10))
q.Set("endMs", strconv.FormatInt(window.End, 10))
q.Set("startMs", strconv.FormatInt(window.Start*1000, 10))
q.Set("endMs", strconv.FormatInt(window.End*1000, 10))
q.Set("resolution", strconv.Itoa(oneMinuteResolutionMs))
uri.RawQuery = q.Encode()
return
}

func (s signalFxClient) buildMetadataURL(host string, metric string) (uri *url.URL, err error) {
uri, err = url.Parse(s.signalFxAddress + signalFxMetdataAPI)
if err != nil {
return nil, err
}
q := uri.Query()

builder := strings.Builder{}
builder.WriteString(host)
builder.WriteString(fmt.Sprintf(" %v ", AND))
builder.WriteString(metric)
q.Set("query", builder.String())
q.Set("limit", resultSetLimit)
uri.RawQuery = q.Encode()
return
}

/**
Sample payload:
{
Expand Down Expand Up @@ -206,3 +281,144 @@ func decodeMetricsPayload(payload interface{}) (float64, error) {
}
return timestampUtilisation[1].(float64), nil
}

/**
Sample metricData payload:
{
"data": {
"Ehql_bxBgAc": [
[
1600213380000,
84.64246793530153
]
],
"EuXgJm7BkAA": [
[
1614634260000,
5.450946379084264
]
],
....
....
},
"errors": []
}
https://dev.splunk.com/observability/reference/api/metrics_metadata/latest#endpoint-retrieve-metric-timeseries-metadata
Sample metaData payload:
{
"count": 5,
"partialCount": false,
"results": [
{
"active": true,
"created": 1614534848000,
"creator": null,
"dimensions": {
"host": "test.dev.com",
"sf_metric": null
},
"id": "EvVH6P7BgAA",
"lastUpdated": 0,
"lastUpdatedBy": null,
"metric": "cpu.utilization"
},
....
....
]
}
*/
func getMetricsFromPayloads(metricData interface{}, metadata interface{}) (map[string]watcher.Metric, error) {
keyHostMap := make(map[string]string)
hostMetricMap := make(map[string]watcher.Metric)
if _, ok := metadata.(map[string]interface{}); !ok {
return hostMetricMap, fmt.Errorf("type conversion failed, found %T", metadata)
}
results := metadata.(map[string]interface{})["results"]
if results == nil {
return hostMetricMap, errors.New("unexpected payload: missing results field")
}

for _, v := range results.([]interface{}) {
_, ok := v.(map[string]interface{})
if !ok {
log.Errorf("type conversion failed, found %T", v)
continue
}
id := v.(map[string]interface{})["id"]
if id == nil {
log.Errorf("id not found in %v", v)
continue
}
_, ok = id.(string)
if !ok {
log.Errorf("id not expected type string, found %T", id)
continue
}
dimensions := v.(map[string]interface{})["dimensions"]
if dimensions == nil {
log.Errorf("no dimensions found in %v", v)
continue
}
_, ok = dimensions.(map[string]interface{})
if !ok {
log.Errorf("type conversion failed, found %T", dimensions)
continue
}
host := dimensions.(map[string]interface{})["host"]
if host == nil {
log.Errorf("no host found in %v", dimensions)
continue
}
if _, ok := host.(string); !ok {
log.Errorf("host not expected type string, found %T", host)
}
keyHostMap[id.(string)] = host.(string)
}

var data interface{}
data = metricData.(map[string]interface{})["data"]
if data == nil {
return hostMetricMap, errors.New("unexpected payload: missing data field")
}
keyMetricMap, ok := data.(map[string]interface{})
if !ok {
return hostMetricMap, errors.New("unable to deserialise data field")
}
for key, metric := range keyMetricMap {
if _, ok := keyHostMap[key]; !ok {
log.Errorf("no metadata found for key %v", key)
continue
}
values, ok := metric.([]interface{})
if !ok {
log.Errorf("unable to deserialise values for key %v", key)
continue
}
if len(values) == 0 {
log.Errorf("no metric value array could be decoded for key %v", key)
continue
}
// Find the average across returned values per 1 minute resolution
var sum float64
var count float64
for _, value := range values {
var timestampUtilisation []interface{}
timestampUtilisation, ok = value.([]interface{})
if !ok || len(timestampUtilisation) < 2 {
log.Errorf("unable to deserialise metric values for key %v", key)
continue
}
if _, ok := timestampUtilisation[1].(float64); !ok {
log.Errorf("unable to typecast value to float64: %v of type %T", timestampUtilisation, timestampUtilisation)
}
sum += timestampUtilisation[1].(float64)
count += 1
}

fetchedMetric := watcher.Metric{Value: sum / count}
hostMetricMap[keyHostMap[key]] = fetchedMetric
}

return hostMetricMap, nil
}

0 comments on commit 6c7e96e

Please sign in to comment.