Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: support NodeMetricCollectPolicy #157

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion apis/slo/v1alpha1/nodemetric_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,18 @@ type ResourceMap struct {
}

// NodeMetricSpec defines the desired state of NodeMetric
type NodeMetricSpec struct{}
type NodeMetricSpec struct {
// CollectPolicy defines the Metric collection policy
CollectPolicy *NodeMetricCollectPolicy `json:"metricCollectPolicy,omitempty"`
}

// NodeMetricCollectPolicy defines the Metric collection policy
type NodeMetricCollectPolicy struct {
// AggregateDurationSeconds represents the aggregation period in seconds
AggregateDurationSeconds *int64 `json:"aggregateDurationSeconds,omitempty"`
// ReportIntervalSeconds represents the report period in seconds
ReportIntervalSeconds *int64 `json:"reportIntervalSeconds,omitempty"`
}

// NodeMetricStatus defines the observed state of NodeMetric
type NodeMetricStatus struct {
Expand Down
32 changes: 31 additions & 1 deletion apis/slo/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions config/crd/bases/slo.koordinator.sh_nodemetrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ spec:
type: object
spec:
description: NodeMetricSpec defines the desired state of NodeMetric
properties:
metricCollectPolicy:
description: CollectPolicy defines the Metric collection policy
properties:
aggregateDurationSeconds:
description: AggregateDurationSeconds represents the aggregation
period in seconds
format: int64
type: integer
reportIntervalSeconds:
description: ReportIntervalSeconds represents the report period
in seconds
format: int64
type: integer
type: object
type: object
status:
description: NodeMetricStatus defines the observed state of NodeMetric
Expand Down
33 changes: 29 additions & 4 deletions pkg/koordlet/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -139,9 +138,8 @@ func (r *reporter) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("timed out waiting for node metric caches to sync")
}

go wait.Until(func() {
r.sync()
}, time.Duration(r.config.ReportIntervalSeconds)*time.Second, stopCh)
go r.syncNodeMetricWorker(stopCh)

} else {
klog.Infof("ReportIntervalSeconds is %d, sync node metric to apiserver is disabled",
r.config.ReportIntervalSeconds)
Expand All @@ -153,6 +151,33 @@ func (r *reporter) Run(stopCh <-chan struct{}) error {
return nil
}

func (r *reporter) syncNodeMetricWorker(stopCh <-chan struct{}) {
reportInterval := r.getNodeMetricReportInterval()
for {
select {
case <-stopCh:
return
case <-time.After(reportInterval):
r.sync()
reportInterval = r.getNodeMetricReportInterval()
}
}
}

func (r *reporter) getNodeMetricReportInterval() time.Duration {
reportInterval := time.Duration(r.config.ReportIntervalSeconds) * time.Second
nodeMetric, err := r.nodeMetricLister.Get(r.nodeName)
if err == nil &&
nodeMetric.Spec.CollectPolicy != nil &&
nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds != nil {
interval := *nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds
if interval > 0 {
reportInterval = time.Duration(interval) * time.Second
}
}
return reportInterval
}

func (r *reporter) sync() {
if !r.isNodeMetricInited() {
klog.Warningf("node metric has not initialized, skip this round.")
Expand Down
40 changes: 40 additions & 0 deletions pkg/koordlet/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@ package reporter

import (
"testing"
"time"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/utils/pointer"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
listerbeta1 "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1"
)

var _ listerbeta1.NodeMetricLister = &fakeNodeMetricLister{}

type fakeNodeMetricLister struct {
nodeMetrics *slov1alpha1.NodeMetric
}

func (f *fakeNodeMetricLister) List(selector labels.Selector) (ret []*slov1alpha1.NodeMetric, err error) {
return []*slov1alpha1.NodeMetric{f.nodeMetrics}, nil
}

func (f *fakeNodeMetricLister) Get(name string) (*slov1alpha1.NodeMetric, error) {
return f.nodeMetrics, nil
}

func Test_reporter_isNodeMetricInited(t *testing.T) {
type fields struct {
nodeMetric *slov1alpha1.NodeMetric
Expand Down Expand Up @@ -50,3 +69,24 @@ func Test_reporter_isNodeMetricInited(t *testing.T) {
})
}
}

func Test_getNodeMetricReportInterval(t *testing.T) {
nodeMetricLister := &fakeNodeMetricLister{
nodeMetrics: &slov1alpha1.NodeMetric{
Spec: slov1alpha1.NodeMetricSpec{
CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{
ReportIntervalSeconds: pointer.Int64(666),
},
},
},
}
r := &reporter{
config: NewDefaultConfig(),
nodeMetricLister: nodeMetricLister,
}
reportInterval := r.getNodeMetricReportInterval()
expectReportInterval := 666 * time.Second
if reportInterval != expectReportInterval {
t.Errorf("expect reportInterval %d but got %d", expectReportInterval, reportInterval)
}
}
30 changes: 18 additions & 12 deletions pkg/slo-controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ type NodeResourceQoSStrategy struct {
}

type ColocationStrategy struct {
Enable *bool `json:"enable,omitempty"`
CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty"`
MemoryReclaimThresholdPercent *int64 `json:"memoryReclaimThresholdPercent,omitempty"`
DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty"`
UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty"`
ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty"`
Enable *bool `json:"enable,omitempty"`
MetricAggregateDurationSeconds *int64 `json:"metricAggregateDurationSeconds,omitempty"`
MetricReportIntervalSeconds *int64 `json:"metricReportIntervalSeconds,omitempty"`
CPUReclaimThresholdPercent *int64 `json:"cpuReclaimThresholdPercent,omitempty"`
MemoryReclaimThresholdPercent *int64 `json:"memoryReclaimThresholdPercent,omitempty"`
DegradeTimeMinutes *int64 `json:"degradeTimeMinutes,omitempty"`
UpdateTimeThresholdSeconds *int64 `json:"updateTimeThresholdSeconds,omitempty"`
ResourceDiffThreshold *float64 `json:"resourceDiffThreshold,omitempty"`
}

func NewDefaultColocationCfg() *ColocationCfg {
Expand All @@ -94,17 +96,21 @@ func DefaultColocationCfg() ColocationCfg {

func DefaultColocationStrategy() ColocationStrategy {
return ColocationStrategy{
Enable: pointer.Bool(false),
CPUReclaimThresholdPercent: pointer.Int64(60),
MemoryReclaimThresholdPercent: pointer.Int64(65),
DegradeTimeMinutes: pointer.Int64(15),
UpdateTimeThresholdSeconds: pointer.Int64(300),
ResourceDiffThreshold: pointer.Float64(0.1),
Enable: pointer.Bool(false),
MetricAggregateDurationSeconds: pointer.Int64(30),
MetricReportIntervalSeconds: pointer.Int64(60),
CPUReclaimThresholdPercent: pointer.Int64(60),
MemoryReclaimThresholdPercent: pointer.Int64(65),
DegradeTimeMinutes: pointer.Int64(15),
UpdateTimeThresholdSeconds: pointer.Int64(300),
ResourceDiffThreshold: pointer.Float64(0.1),
}
}

func IsColocationStrategyValid(strategy *ColocationStrategy) bool {
return strategy != nil &&
(strategy.MetricAggregateDurationSeconds == nil || *strategy.MetricReportIntervalSeconds > 0) &&
(strategy.MetricReportIntervalSeconds == nil || *strategy.MetricReportIntervalSeconds > 0) &&
(strategy.CPUReclaimThresholdPercent == nil || *strategy.CPUReclaimThresholdPercent > 0) &&
(strategy.MemoryReclaimThresholdPercent == nil || *strategy.MemoryReclaimThresholdPercent > 0) &&
(strategy.DegradeTimeMinutes == nil || *strategy.DegradeTimeMinutes > 0) &&
Expand Down
14 changes: 8 additions & 6 deletions pkg/slo-controller/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func Test_GetNodeColocationStrategy(t *testing.T) {
},
},
want: &ColocationStrategy{
Enable: pointer.BoolPtr(true),
CPUReclaimThresholdPercent: pointer.Int64Ptr(60),
MemoryReclaimThresholdPercent: pointer.Int64Ptr(65),
DegradeTimeMinutes: pointer.Int64Ptr(15),
UpdateTimeThresholdSeconds: pointer.Int64Ptr(300),
ResourceDiffThreshold: pointer.Float64Ptr(0.1),
Enable: pointer.BoolPtr(true),
MetricAggregateDurationSeconds: pointer.Int64Ptr(30),
MetricReportIntervalSeconds: pointer.Int64Ptr(60),
CPUReclaimThresholdPercent: pointer.Int64Ptr(60),
MemoryReclaimThresholdPercent: pointer.Int64Ptr(65),
DegradeTimeMinutes: pointer.Int64Ptr(15),
UpdateTimeThresholdSeconds: pointer.Int64Ptr(300),
ResourceDiffThreshold: pointer.Float64Ptr(0.1),
},
},
{
Expand Down
54 changes: 54 additions & 0 deletions pkg/slo-controller/nodemetric/collect_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package nodemetric

import (
"fmt"

corev1 "k8s.io/api/core/v1"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/config"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource"
)

func getNodeMetricCollectPolicy(node *corev1.Node, colocationConfig *noderesource.Config) (*slov1alpha1.NodeMetricCollectPolicy, error) {
colocationConfig.RLock()
defer colocationConfig.RUnlock()

strategy := config.GetNodeColocationStrategy(&colocationConfig.ColocationCfg, node)
if strategy == nil {
strategy = &colocationConfig.ColocationStrategy
}
if strategy == nil {
return nil, fmt.Errorf("failed to find satisfied strategy")
}

if !config.IsColocationStrategyValid(strategy) {
return nil, fmt.Errorf("invalid colocationConfig")
}

if strategy.Enable == nil || !*strategy.Enable {
return nil, fmt.Errorf("colocationConfig disabled")
}

collectPolicy := &slov1alpha1.NodeMetricCollectPolicy{
AggregateDurationSeconds: strategy.MetricAggregateDurationSeconds,
ReportIntervalSeconds: strategy.MetricReportIntervalSeconds,
}
return collectPolicy, nil
}
Loading