Skip to content

Commit

Permalink
decode metric list according metric name whether is aggregator metric
Browse files Browse the repository at this point in the history
  • Loading branch information
luomingmeng committed Oct 18, 2023
1 parent 5652340 commit c9990c5
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 68 deletions.
11 changes: 6 additions & 5 deletions pkg/custom-metric/store/data/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,15 @@ func (c *CachedMetric) AddAggregatedMetric(aList ...types.Metric) {
continue
}

if _, ok := c.metricMap[d.MetricMetaImp]; !ok {
c.metricMap[d.MetricMetaImp] = make(map[types.ObjectMeta]*internalMetricImp)
baseMetricMetaImp := d.GetBaseMetricMetaImp()
if _, ok := c.metricMap[baseMetricMetaImp]; !ok {
c.metricMap[baseMetricMetaImp] = make(map[types.ObjectMeta]*internalMetricImp)
}
if _, ok := c.metricMap[d.MetricMetaImp][d.ObjectMetaImp]; !ok {
c.metricMap[d.MetricMetaImp][d.ObjectMetaImp] = newInternalMetric(d.MetricMetaImp, d.ObjectMetaImp, d.BasicMetric)
if _, ok := c.metricMap[baseMetricMetaImp][d.ObjectMetaImp]; !ok {
c.metricMap[baseMetricMetaImp][d.ObjectMetaImp] = newInternalMetric(baseMetricMetaImp, d.ObjectMetaImp, d.BasicMetric)
}

c.metricMap[d.MetricMetaImp][d.ObjectMetaImp].mergeAggregatedMetric(d)
c.metricMap[baseMetricMetaImp][d.ObjectMetaImp].mergeAggregatedMetric(d)
}
}

Expand Down
113 changes: 107 additions & 6 deletions pkg/custom-metric/store/data/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,31 +480,132 @@ func TestCache(t *testing.T) {
Timestamp: 12 * time.Second.Milliseconds(),
WindowSeconds: 8,
},
Values: map[string]float64{},
}

agg.Values = map[string]float64{"m-3" + metric.AggregateFunctionAvg: 8.25}
agg.Name = "m-3" + metric.AggregateFunctionAvg
agg.Value = 8.25
metricList, exist = c.GetMetric("n-3", "m-3"+metric.AggregateFunctionAvg, "pod-5", &schema.GroupResource{Resource: "pod"}, false)
assert.Equal(t, true, exist)
assert.ElementsMatch(t, []*types.AggregatedMetric{agg}, metricList)

agg.Values = map[string]float64{"m-3" + metric.AggregateFunctionMax: 12}
agg.Name = "m-3" + metric.AggregateFunctionMax
agg.Value = 12
metricList, exist = c.GetMetric("n-3", "m-3"+metric.AggregateFunctionMax, "pod-5", &schema.GroupResource{Resource: "pod"}, false)
assert.Equal(t, true, exist)
assert.ElementsMatch(t, []*types.AggregatedMetric{agg}, metricList)

agg.Values = map[string]float64{"m-3" + metric.AggregateFunctionMin: 4}
agg.Name = "m-3" + metric.AggregateFunctionMin
agg.Value = 4
metricList, exist = c.GetMetric("n-3", "m-3"+metric.AggregateFunctionMin, "pod-5", &schema.GroupResource{Resource: "pod"}, false)
assert.Equal(t, true, exist)
assert.ElementsMatch(t, []*types.AggregatedMetric{agg}, metricList)

agg.Values = map[string]float64{"m-3" + metric.AggregateFunctionP99: 11}
agg.Name = "m-3" + metric.AggregateFunctionP99
agg.Value = 11
metricList, exist = c.GetMetric("n-3", "m-3"+metric.AggregateFunctionP99, "pod-5", &schema.GroupResource{Resource: "pod"}, false)
assert.Equal(t, true, exist)
assert.ElementsMatch(t, []*types.AggregatedMetric{agg}, metricList)

agg.Values = map[string]float64{"m-3" + metric.AggregateFunctionP90: 11}
agg.Name = "m-3" + metric.AggregateFunctionP90
agg.Value = 11
metricList, exist = c.GetMetric("n-3", "m-3"+metric.AggregateFunctionP90, "pod-5", &schema.GroupResource{Resource: "pod"}, false)
assert.Equal(t, true, exist)
assert.ElementsMatch(t, []*types.AggregatedMetric{agg}, metricList)
}

func TestMergeInternalMetricList(t *testing.T) {
type args struct {
metricName string
metricLists [][]types.Metric
}
tests := []struct {
name string
args args
want []types.Metric
}{
{
args: args{
metricName: "pod_cpu_usage_agg_max",
metricLists: [][]types.Metric{
{
&types.AggregatedMetric{
MetricMetaImp: types.MetricMetaImp{
Name: "pod_cpu_usage_agg_max",
Namespaced: true,
ObjectKind: "pod",
},
ObjectMetaImp: types.ObjectMetaImp{
ObjectNamespace: "n-1",
ObjectName: "pod-1",
},
BasicMetric: types.BasicMetric{
Labels: map[string]string{
"Name": "m-1",
},
},
AggregatedIdentity: types.AggregatedIdentity{
Count: 4,
Timestamp: 12 * time.Second.Milliseconds(),
WindowSeconds: 8,
},
Value: 10,
},
},
{
&types.AggregatedMetric{
MetricMetaImp: types.MetricMetaImp{
Name: "pod_cpu_usage_agg_max",
Namespaced: true,
ObjectKind: "pod",
},
ObjectMetaImp: types.ObjectMetaImp{
ObjectNamespace: "n-1",
ObjectName: "pod-1",
},
BasicMetric: types.BasicMetric{
Labels: map[string]string{
"Name": "m-1",
},
},
AggregatedIdentity: types.AggregatedIdentity{
Count: 4,
Timestamp: 12 * time.Second.Milliseconds(),
WindowSeconds: 8,
},
Value: 10,
},
},
},
},
want: []types.Metric{
&types.AggregatedMetric{
MetricMetaImp: types.MetricMetaImp{
Name: "pod_cpu_usage_agg_max",
Namespaced: true,
ObjectKind: "pod",
},
ObjectMetaImp: types.ObjectMetaImp{
ObjectNamespace: "n-1",
ObjectName: "pod-1",
},
BasicMetric: types.BasicMetric{
Labels: map[string]string{
"Name": "m-1",
},
},
AggregatedIdentity: types.AggregatedIdentity{
Count: 4,
Timestamp: 12 * time.Second.Milliseconds(),
WindowSeconds: 8,
},
Value: 10,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, MergeInternalMetricList(tt.args.metricName, tt.args.metricLists...), "MergeInternalMetricList(%v, %v)", tt.args.metricName, tt.args.metricLists)
})
}
}
48 changes: 24 additions & 24 deletions pkg/custom-metric/store/data/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type internalMetricImp struct {
timestampSets map[int64]interface{}
seriesMetric *types.SeriesMetric

aggregatedMetric *types.AggregatedMetric
*types.AggregatedIdentity
aggregatedMetric map[string]*types.AggregatedMetric
}

func newInternalMetric(m types.MetricMetaImp, o types.ObjectMetaImp, b types.BasicMetric) *internalMetricImp {
Expand All @@ -53,7 +54,7 @@ func newInternalMetric(m types.MetricMetaImp, o types.ObjectMetaImp, b types.Bas
BasicMetric: b,
timestampSets: make(map[int64]interface{}),
seriesMetric: types.NewSeriesMetric(),
aggregatedMetric: types.NewAggregatedInternalMetric(),
aggregatedMetric: make(map[string]*types.AggregatedMetric),
}
}

Expand All @@ -73,16 +74,19 @@ func (a *internalMetricImp) getSeriesItems(latest bool) (types.Metric, bool) {
}

func (a *internalMetricImp) getAggregatedItems(agg string) (types.Metric, bool) {
v, ok := a.aggregatedMetric.Values[agg]
v, ok := a.aggregatedMetric[agg]
if !ok {
return nil, false
}

res := a.aggregatedMetric.DeepCopy().(*types.AggregatedMetric)
res.MetricMetaImp = a.MetricMetaImp.DeepCopy()
res := v.DeepCopy().(*types.AggregatedMetric)
res.MetricMetaImp = types.AggregatorMetricMetaImp(a.MetricMetaImp, agg)
res.ObjectMetaImp = a.ObjectMetaImp.DeepCopy()
res.BasicMetric = a.BasicMetric.DeepCopy()
res.Values = map[string]float64{a.GetName() + agg: v}
if a.AggregatedIdentity != nil {
res.AggregatedIdentity = *a.AggregatedIdentity
}

return res, true
}

Expand Down Expand Up @@ -119,21 +123,15 @@ func (a *internalMetricImp) addSeriesMetric(is *types.SeriesMetric) []*types.Ser
}

func (a *internalMetricImp) mergeAggregatedMetric(as *types.AggregatedMetric) {
if len(a.aggregatedMetric.Values) == 0 {
a.aggregatedMetric = as
return
}

for k, v := range as.Values {
if _, ok := a.aggregatedMetric.Values[k]; !ok {
a.aggregatedMetric.Values[k] = v
}
_, aggName := types.ParseAggregator(as.GetName())
if _, ok := a.aggregatedMetric[aggName]; !ok {
a.aggregatedMetric[aggName] = as
}
}

// aggregateMetric calculate the aggregated metric based on snapshot of current store
func (a *internalMetricImp) aggregateMetric() {
a.aggregatedMetric.Values = make(map[string]float64)
a.aggregatedMetric = make(map[string]*types.AggregatedMetric)
if len(a.seriesMetric.Values) <= 0 {
return
}
Expand All @@ -149,23 +147,25 @@ func (a *internalMetricImp) aggregateMetric() {
statsData = append(statsData, item.Value)
}

a.aggregatedMetric.Count = int64(a.seriesMetric.Len())
a.aggregatedMetric.Timestamp = a.seriesMetric.Values[0].Timestamp
a.aggregatedMetric.WindowSeconds = (a.seriesMetric.Values[a.seriesMetric.Len()-1].Timestamp - a.seriesMetric.Values[0].Timestamp) / time.Second.Milliseconds()
a.aggregatedMetric.Values[apimetric.AggregateFunctionMax] = max
a.aggregatedMetric.Values[apimetric.AggregateFunctionMin] = min
a.aggregatedMetric.Values[apimetric.AggregateFunctionAvg] = sum / float64(a.seriesMetric.Len())
a.AggregatedIdentity = &types.AggregatedIdentity{
Count: int64(a.seriesMetric.Len()),
Timestamp: a.seriesMetric.Values[0].Timestamp,
WindowSeconds: (a.seriesMetric.Values[a.seriesMetric.Len()-1].Timestamp - a.seriesMetric.Values[0].Timestamp) / time.Second.Milliseconds(),
}
a.aggregatedMetric[apimetric.AggregateFunctionMax] = types.NewAggregatedInternalMetric(max)
a.aggregatedMetric[apimetric.AggregateFunctionMin] = types.NewAggregatedInternalMetric(min)
a.aggregatedMetric[apimetric.AggregateFunctionAvg] = types.NewAggregatedInternalMetric(sum / float64(a.seriesMetric.Len()))

if p99, err := statsData.Percentile(99); err != nil {
general.Errorf("failed to get stats p99: %v", err)
} else {
a.aggregatedMetric.Values[apimetric.AggregateFunctionP99] = p99
a.aggregatedMetric[apimetric.AggregateFunctionP99] = types.NewAggregatedInternalMetric(p99)
}

if p90, err := statsData.Percentile(90); err != nil {
general.Errorf("failed to get stats p90: %v", err)
} else {
a.aggregatedMetric.Values[apimetric.AggregateFunctionP90] = p90
a.aggregatedMetric[apimetric.AggregateFunctionP90] = types.NewAggregatedInternalMetric(p90)
}
}

Expand Down
51 changes: 36 additions & 15 deletions pkg/custom-metric/store/data/types/aggragated.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/kubewharf/katalyst-api/pkg/metric"
"github.com/kubewharf/katalyst-core/pkg/util/general"
)

var validAggregatorSuffixList = []string{
Expand Down Expand Up @@ -72,14 +71,14 @@ type AggregatedMetric struct {
BasicMetric `json:",inline"`

AggregatedIdentity `json:",inline"`
Values map[string]float64 `json:"values,omitempty"`
Value float64 `json:"values,omitempty"`
}

var _ Metric = &AggregatedMetric{}

func NewAggregatedInternalMetric() *AggregatedMetric {
func NewAggregatedInternalMetric(value float64) *AggregatedMetric {
return &AggregatedMetric{
Values: make(map[string]float64),
Value: value,
}
}

Expand All @@ -90,12 +89,21 @@ func (as *AggregatedMetric) DeepCopy() Metric {
BasicMetric: as.BasicMetric.DeepCopy(),

AggregatedIdentity: as.AggregatedIdentity,
Values: general.DeepCopyFload64Map(as.Values),
Value: as.Value,
}
}

func (as *AggregatedMetric) GetBaseMetricMetaImp() MetricMetaImp {
origin, _ := ParseAggregator(as.GetName())
return MetricMetaImp{
Name: origin,
Namespaced: as.Namespaced,
ObjectKind: as.ObjectKind,
}
}

func (as *AggregatedMetric) Len() int {
return len(as.Values)
return 1
}

func (as *AggregatedMetric) String() string {
Expand All @@ -104,16 +112,20 @@ func (as *AggregatedMetric) String() string {
}

func (as *AggregatedMetric) GetItemList() []Item {
var res []Item
for k, v := range as.Values {
if len(k) > 0 {
res = append(res, &AggregatedItem{
AggregatedIdentity: as.AggregatedIdentity,
Value: v,
})
}
return []Item{
&AggregatedItem{
AggregatedIdentity: as.AggregatedIdentity,
Value: as.Value,
},
}
}

func AggregatorMetricMetaImp(origin MetricMetaImp, aggName string) MetricMetaImp {
return MetricMetaImp{
Name: origin.GetName() + aggName,
Namespaced: origin.GetNamespaced(),
ObjectKind: origin.GetObjectKind(),
}
return res
}

// ParseAggregator parses the given metricName into aggregator-suffix and origin-metric,
Expand All @@ -126,3 +138,12 @@ func ParseAggregator(metricName string) (string, string) {
}
return metricName, ""
}

// IsAggregatorMetric return whether this metric is aggregator metric
func IsAggregatorMetric(metricName string) bool {
_, agg := ParseAggregator(metricName)
if len(agg) > 0 {
return true
}
return false
}
34 changes: 18 additions & 16 deletions pkg/custom-metric/store/data/types/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,27 @@ func UnmarshalMetricList(bytes []byte) (res []Metric, err error) {
return nil, fmt.Errorf("bytes can be unmarshalled into neither series metric nor aggregated metric")
}

func DecodeMetricList(body io.ReadCloser) (res []Metric, err error) {
var sList []*SeriesMetric
if err = json.NewDecoder(body).Decode(&sList); err == nil {
for _, s := range sList {
res = append(res, s)
func DecodeMetricList(body io.ReadCloser, metricName string) (res []Metric, err error) {
if IsAggregatorMetric(metricName) {
var aList []*AggregatedMetric
if err = json.NewDecoder(body).Decode(&aList); err == nil {
for _, a := range aList {
res = append(res, a)
}
return
} else {
klog.Infof("bytes decoded into aggregated metric err: %v", err)
}
return
} else {
klog.Infof("bytes decoded into series metric err: %v", err)
}

var aList []*AggregatedMetric
if err = json.NewDecoder(body).Decode(&aList); err == nil {
for _, a := range aList {
res = append(res, a)
var sList []*SeriesMetric
if err = json.NewDecoder(body).Decode(&sList); err == nil {
for _, s := range sList {
res = append(res, s)
}
return
} else {
klog.Infof("bytes decoded into series metric err: %v", err)
}
return
} else {
klog.Infof("bytes decoded into aggregated metric err: %v", err)
}

return nil, fmt.Errorf("bytes can be decoded into neither series metric nor aggregated metric")
Expand Down
Loading

0 comments on commit c9990c5

Please sign in to comment.