Skip to content

Commit

Permalink
perf(kcmas): performance optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzzhhb authored and waynepeking348 committed Oct 30, 2023
1 parent 132d12c commit bf2d03c
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 27 deletions.
24 changes: 19 additions & 5 deletions pkg/custom-metric/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store"
"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

const (
Expand Down Expand Up @@ -160,15 +161,11 @@ func (m *MetricProviderImp) GetMetricBySelector(ctx context.Context, namespace s
continue
}

m.emitCustomMetricLatencyByRawMetrics(metric)
resultCount += metric.Len()
items = append(items, PackMetricValueList(metric)...)
}

// TODO, emit metrics only when this functionality switched on
for i := range items {
m.emitCustomMetricLatency(&items[i])
}

return &custom_metrics.MetricValueList{
Items: items,
}, nil
Expand Down Expand Up @@ -291,8 +288,25 @@ func (m *MetricProviderImp) ListAllExternalMetrics() []provider.ExternalMetricIn
return res
}

func (m *MetricProviderImp) emitCustomMetricLatencyByRawMetrics(metric types.Metric) {
items := metric.GetItemList()
latestItem := items[len(items)-1]
dataLatency := time.Now().Sub(time.UnixMilli(latestItem.GetTimestamp())).Microseconds()
general.Infof("query custom metrics, metric name:%v, object name:%v, object kind: %v, latest timestamp: %v(parsed: %v), data latency: %v(microseconds)", metric.GetName(),
metric.GetObjectName(), metric.GetObjectKind(), latestItem.GetTimestamp(), time.UnixMilli(latestItem.GetTimestamp()), dataLatency)
tags := []metrics.MetricTag{
{Key: "metric_name", Val: metric.GetName()},
{Key: "object_name", Val: metric.GetObjectName()},
{Key: "object_kind", Val: metric.GetObjectKind()},
}

_ = m.metricsEmitter.StoreInt64(metricsNameKCMASProviderCustomMetricLatency, dataLatency, metrics.MetricTypeNameRaw, tags...)
}

func (m *MetricProviderImp) emitCustomMetricLatency(metric *custom_metrics.MetricValue) {
dataLatency := time.Now().Sub(metric.Timestamp.Time).Microseconds()
general.Infof("query custom metrics, metric name:%v, object name:%v, object kind: %v, latest timestamp: %v(parsed: %v), data latency: %v(microseconds)", metric.Metric.Name,
metric.Metric.Name, metric.GetObjectKind(), metric.Timestamp.UnixMilli(), metric.Timestamp.Time, dataLatency)
tags := []metrics.MetricTag{
{Key: "metric_name", Val: metric.Metric.Name},
{Key: "object_name", Val: metric.DescribedObject.Name},
Expand Down
26 changes: 20 additions & 6 deletions pkg/custom-metric/store/data/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ import (

"github.com/kubewharf/katalyst-core/pkg/custom-metric/store/data/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

const (
metricsNameKCMASStoreDataLatencySet = "kcmas_store_data_latency_set"
metricsNameKCMASStoreDataLatencyGet = "kcmas_store_data_latency_get"

metricsNameKCMASStoreDataSetCost = "kcmas_store_data_cost_set"
metricsNameKCMASStoreDataGetCost = "kcmas_store_data_cost_get"

metricsNameKCMASStoreDataLength = "kcmas_store_data_length"
metricsNameKCMASStoreWindowSeconds = "kcmas_store_data_window_seconds"
)
Expand All @@ -50,10 +54,13 @@ func NewCachedMetric(metricsEmitter metrics.MetricEmitter) *CachedMetric {
}

func (c *CachedMetric) AddSeriesMetric(sList ...types.Metric) {
now := time.Now()
start := time.Now()

c.Lock()
defer c.Unlock()
defer func() {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataSetCost, time.Now().Sub(start).Microseconds(), metrics.MetricTypeNameRaw)
}()

var needReAggregate []*internalMetricImp
for _, s := range sList {
Expand All @@ -73,7 +80,10 @@ func (c *CachedMetric) AddSeriesMetric(sList ...types.Metric) {
if len(added) > 0 {
needReAggregate = append(needReAggregate, c.metricMap[d.MetricMetaImp][d.ObjectMetaImp])
index := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].seriesMetric.Len() - 1
costs := now.Sub(time.UnixMilli(c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].seriesMetric.Values[index].Timestamp)).Microseconds()
latestTimestamp := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].seriesMetric.Values[index].Timestamp
costs := start.Sub(time.UnixMilli(latestTimestamp)).Microseconds()
general.InfofV(6, "set cache,metric name: %v, series length: %v, add length:%v, latest timestamp: %v, costs: %v(microsecond)", d.MetricMetaImp.Name,
s.Len(), len(added), latestTimestamp, costs)
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencySet, costs, metrics.MetricTypeNameRaw,
types.GenerateMetaTags(d.MetricMetaImp, d.ObjectMetaImp)...)
}
Expand Down Expand Up @@ -138,11 +148,14 @@ func (c *CachedMetric) ListAllMetricNames() []string {
}

func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, gr *schema.GroupResource, latest bool) ([]types.Metric, bool) {
now := time.Now()
start := time.Now()
originMetricName, aggName := types.ParseAggregator(metricName)

c.RLock()
defer c.RUnlock()
defer func() {
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataGetCost, time.Now().Sub(start).Microseconds(), metrics.MetricTypeNameRaw)
}()

var res []types.Metric
metricMeta := types.MetricMetaImp{
Expand All @@ -168,9 +181,10 @@ func (c *CachedMetric) GetMetric(namespace, metricName string, objName string, g
}

if exist && metricItem.Len() > 0 {
res = append(res, metricItem.DeepCopy())
costs := now.Sub(time.UnixMilli(internal.seriesMetric.Values[internal.len()-1].Timestamp)).Microseconds()
_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.generateTags()...)
res = append(res, metricItem)
// TODO this metrics costs great mount of cpu resource
//costs := now.Sub(time.UnixMilli(internal.seriesMetric.Values[internal.len()-1].Timestamp)).Microseconds()
//_ = c.emitter.StoreInt64(metricsNameKCMASStoreDataLatencyGet, costs, metrics.MetricTypeNameRaw, internal.generateTags()...)
}
}
return res, true
Expand Down
2 changes: 1 addition & 1 deletion pkg/custom-metric/store/data/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func TestCache(t *testing.T) {
},
AggregatedIdentity: types.AggregatedIdentity{
Count: 4,
Timestamp: 12 * time.Second.Milliseconds(),
Timestamp: 20 * time.Second.Milliseconds(),
WindowSeconds: 8,
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/custom-metric/store/data/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (a *internalMetricImp) aggregateMetric() {

identity := types.AggregatedIdentity{
Count: int64(a.seriesMetric.Len()),
Timestamp: a.seriesMetric.Values[0].Timestamp,
Timestamp: a.seriesMetric.Values[len(a.seriesMetric.Values)-1].Timestamp,
WindowSeconds: (a.seriesMetric.Values[a.seriesMetric.Len()-1].Timestamp - a.seriesMetric.Values[0].Timestamp) / time.Second.Milliseconds(),
}
a.aggregatedMetric[apimetric.AggregateFunctionMax] = types.NewAggregatedInternalMetric(max, identity)
Expand Down
15 changes: 12 additions & 3 deletions pkg/custom-metric/store/data/types/aggragated.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package types

import (
"fmt"
"bytes"
"math/big"
"strings"

Expand Down Expand Up @@ -108,8 +108,17 @@ func (as *AggregatedMetric) Len() int {
}

func (as *AggregatedMetric) String() string {
return fmt.Sprintf("{ObjectNamespace: %v, Name: %v, ObjectKind: %v, ObjectName: %v}",
as.GetObjectNamespace(), as.GetName(), as.GetObjectKind(), as.GetObjectName())
b := bytes.Buffer{}
b.WriteString("{ObjectNamespace: ")
b.WriteString(as.GetObjectNamespace())
b.WriteString(", Name: ")
b.WriteString(as.GetName())
b.WriteString(", ObjectKind: ")
b.WriteString(as.GetObjectKind())
b.WriteString(", ObjectName: ")
b.WriteString(as.GetObjectName())
b.WriteString("}")
return b.String()
}

func (as *AggregatedMetric) GetItemList() []Item {
Expand Down
16 changes: 13 additions & 3 deletions pkg/custom-metric/store/data/types/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package types

import (
"fmt"
"bytes"
"math/big"

"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -72,6 +72,7 @@ func (is *SeriesMetric) DeepCopy() Metric {
MetricMetaImp: is.MetricMetaImp.DeepCopy(),
ObjectMetaImp: is.ObjectMetaImp.DeepCopy(),
BasicMetric: is.BasicMetric.DeepCopy(),
Values: make([]*SeriesItem, 0, len(is.Values)),
}
for _, i := range is.Values {
res.Values = append(res.Values, i.DeepCopy().(*SeriesItem))
Expand All @@ -92,8 +93,17 @@ func (is *SeriesMetric) Len() int {
}

func (is *SeriesMetric) String() string {
return fmt.Sprintf("{ObjectNamespace: %v, Name: %v, ObjectKind: %v, ObjectName: %v}",
is.GetObjectNamespace(), is.GetName(), is.GetObjectKind(), is.GetObjectName())
b := bytes.Buffer{}
b.WriteString("{ObjectNamespace: ")
b.WriteString(is.GetObjectNamespace())
b.WriteString(", Name: ")
b.WriteString(is.GetName())
b.WriteString(", ObjectKind: ")
b.WriteString(is.GetObjectKind())
b.WriteString(", ObjectName: ")
b.WriteString(is.GetObjectName())
b.WriteString("}")
return b.String()
}

func (is *SeriesMetric) AddMetric(item *SeriesItem) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/metrics/tag_wrapper_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@ type MetricTagWrapper struct {
var _ MetricEmitter = &MetricTagWrapper{}

func (t *MetricTagWrapper) StoreInt64(key string, val int64, emitType MetricTypeName, tags ...MetricTag) error {
tags = append(tags, t.commonTags...)
tags = append(tags, t.unitTag)
return t.MetricEmitter.StoreInt64(key, val, emitType, tags...)
allTags := make([]MetricTag, 0, len(tags)+len(t.commonTags)+1)
allTags = append(allTags, tags...)
allTags = append(allTags, t.commonTags...)
allTags = append(allTags, t.unitTag)
return t.MetricEmitter.StoreInt64(key, val, emitType, allTags...)
}

func (t *MetricTagWrapper) StoreFloat64(key string, val float64, emitType MetricTypeName, tags ...MetricTag) error {
tags = append(tags, t.commonTags...)
tags = append(tags, t.unitTag)
return t.MetricEmitter.StoreFloat64(key, val, emitType, tags...)
allTags := make([]MetricTag, 0, len(tags)+len(t.commonTags)+1)
allTags = append(allTags, tags...)
allTags = append(allTags, t.commonTags...)
allTags = append(allTags, t.unitTag)
return t.MetricEmitter.StoreFloat64(key, val, emitType, allTags...)
}

func (t *MetricTagWrapper) Run(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/general/deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func DeepCopyMap(origin map[string]string) map[string]string {
return nil
}

res := make(map[string]string)
res := make(map[string]string, len(origin))
for key, val := range origin {
res[key] = val
}
Expand All @@ -33,7 +33,7 @@ func DeepCopyFload64Map(origin map[string]float64) map[string]float64 {
return nil
}

res := make(map[string]float64)
res := make(map[string]float64, len(origin))
for key, val := range origin {
res[key] = val
}
Expand Down

0 comments on commit bf2d03c

Please sign in to comment.