Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (sysadvisor): get power data served by metric store sourced from malachite realtime metric server #720

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/katalyst-agent/app/options/metaserver/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewMetricFetcherOptions() *MetricFetcherOptions {
MetricProvisions: []string{metaserver.MetricProvisionerMalachite, metaserver.MetricProvisionerKubelet},

DefaultInterval: time.Second * 5,
ProvisionerIntervalSecs: make(map[string]int),
ProvisionerIntervalSecs: map[string]int{metaserver.MetricProvisionerMalachiteRealtime: 1},

MalachiteOptions: &MalachiteOptions{},
CgroupOptions: &CgroupOptions{},
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/sysadvisor/plugin/poweraware/advisor/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import (
)

const (
// 8 seconds between actions since RAPL/HSMP capping needs 4-6 seconds to stabilize itself
intervalSpecFetch = time.Second * 8
// 9 seconds between actions since RAPL/HSMP capping needs 4-6 seconds to stabilize itself
// and malachite realtime metric server imposes delay of up to 2 seconds
intervalSpecFetch = time.Second * 9

metricPowerAwareCurrentPowerInWatt = "power_current_watt"
metricPowerAwareDesiredPowerInWatt = "power_desired_watt"
Expand Down
4 changes: 1 addition & 3 deletions pkg/agent/sysadvisor/plugin/poweraware/power_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ func NewPowerAwarePlugin(
}
}

// todo: use the power usage data from malachite
// we may temporarily have a local reader on top of ipmi (in dev branch), before malachite is ready
powerReader := reader.NewDummyPowerReader()
powerReader := reader.NewMetricStorePowerReader(metaServer)

powerAdvisor := advisor.NewAdvisor(conf.PowerAwarePluginConfiguration.DryRun,
conf.PowerAwarePluginConfiguration.AnnotationKeyPrefix,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reader

import (
"context"
"time"

"github.com/pkg/errors"

"github.com/kubewharf/katalyst-core/pkg/consts"
utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

// malachite realtime power metric server imposes delay of up to 2 seconds coupled with sampling interval of 1 sec
const powerTolerationTime = 3 * time.Second

type nodeMetricGetter interface {
GetNodeMetric(metricName string) (utilmetric.MetricData, error)
}

type metricStorePowerReader struct {
nodeMetricGetter
}

func (m *metricStorePowerReader) Init() error {
return nil
}

func isDataFresh(data utilmetric.MetricData, now time.Time) bool {
if data.Time == nil {
return false
}
return now.Before(data.Time.Add(powerTolerationTime))
}

func (m *metricStorePowerReader) Get(ctx context.Context) (int, error) {
return m.get(ctx, time.Now())
}

func (m *metricStorePowerReader) get(ctx context.Context, now time.Time) (int, error) {
data, err := m.GetNodeMetric(consts.MetricTotalPowerUsedWatts)
if err != nil {
return 0, errors.Wrap(err, "failed to get metric from metric store")
}

if !isDataFresh(data, now) {
return 0, errors.New("power data in metric store is stale")
}

return int(data.Value), nil
}

func (m *metricStorePowerReader) Cleanup() {}

func NewMetricStorePowerReader(nodeMetricGetter nodeMetricGetter) PowerReader {
return &metricStorePowerReader{
nodeMetricGetter: nodeMetricGetter,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reader

import (
"context"
"testing"
"time"

utilmetric "github.com/kubewharf/katalyst-core/pkg/util/metric"
)

func Test_metricStorePowerReader_get(t *testing.T) {
t.Parallel()

setTime := time.Date(2024, 11, 11, 8, 30, 10, 0, time.UTC)
dummyMetricStore := utilmetric.NewMetricStore()
dummyMetricStore.SetNodeMetric("total.power.used.watts", utilmetric.MetricData{
Value: 999,
Time: &setTime,
})

type fields struct {
metricStore *utilmetric.MetricStore
}
type args struct {
ctx context.Context
now time.Time
}
tests := []struct {
name string
fields fields
args args
want int
wantErr bool
}{
{
name: "happy path",
fields: fields{
metricStore: dummyMetricStore,
},
args: args{
ctx: context.TODO(),
now: time.Date(2024, 11, 11, 8, 30, 11, 0, time.UTC),
},
want: 999,
wantErr: false,
},
{
name: "stale data",
fields: fields{
metricStore: dummyMetricStore,
},
args: args{
ctx: context.TODO(),
now: time.Date(2024, 11, 11, 8, 30, 13, 0, time.UTC),
},
want: 0,
wantErr: true,
},
{
name: "no such metric in store",
fields: fields{
metricStore: utilmetric.NewMetricStore(),
},
args: args{
ctx: context.TODO(),
now: time.Date(2024, 11, 11, 8, 30, 13, 0, time.UTC),
},
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
m := NewMetricStorePowerReader(tt.fields.metricStore).(*metricStorePowerReader)
got, err := m.get(tt.args.ctx, tt.args.now)
if (err != nil) != tt.wantErr {
t.Errorf("get() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("get() got = %v, want %v", got, tt.want)
}
})
}
}
9 changes: 5 additions & 4 deletions pkg/config/agent/metaserver/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
)

const (
MetricProvisionerMalachite = "malachite"
MetricProvisionerCgroup = "cgroup"
MetricProvisionerKubelet = "kubelet"
MetricProvisionerRodan = "rodan"
MetricProvisionerMalachite = "malachite"
MetricProvisionerMalachiteRealtime = "malachite_realtime"
MetricProvisionerCgroup = "cgroup"
MetricProvisionerKubelet = "kubelet"
MetricProvisionerRodan = "rodan"
)

type MetricConfiguration struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/consts/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ const (
MetricsNodeFsInodesUsed = "used.inodes.fs.node"
)

// System Power metrics
const (
MetricTotalPowerUsedWatts = "total.power.used.watts"
)

// Image filesystem metrics
const (
MetricsImageFsAvailable = "available.rootfs.system"
Expand Down
1 change: 1 addition & 0 deletions pkg/metaserver/agent/metric/metric_provisoner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func init() {
RegisterProvisioners(metaserver.MetricProvisionerMalachite, malachite.NewMalachiteMetricsProvisioner)
RegisterProvisioners(metaserver.MetricProvisionerMalachiteRealtime, malachite.NewMalachiteRealtimeMetricsProvisioner)
RegisterProvisioners(metaserver.MetricProvisionerKubelet, kubelet.NewKubeletSummaryProvisioner)
RegisterProvisioners(metaserver.MetricProvisionerCgroup, cgroup.NewCGroupMetricsProvisioner)
RegisterProvisioners(metaserver.MetricProvisionerRodan, rodan.NewRodanMetricsProvisioner)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
SystemNetResource = "system/network"
SystemMemoryResource = "system/memory"
SystemComputeResource = "system/compute"

RealtimePowerResource = "realtime/power"
)

type SystemResourceKind int
Expand Down Expand Up @@ -61,6 +63,7 @@ func NewMalachiteClient(fetcher pod.PodFetcher) *MalachiteClient {
SystemNetResource,
SystemComputeResource,
SystemMemoryResource,
RealtimePowerResource,
} {
urls[path] = fmt.Sprintf("http://localhost:%d/api/v1/%s", malachiteServicePort, path)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/kubewharf/katalyst-core/pkg/metaserver/agent/metric/provisioner/malachite/types"
)

func (c *MalachiteClient) GetPowerData() (*types.PowerData, error) {
payload, err := c.getRealtimePowerPayload(RealtimePowerResource)
if err != nil {
return nil, err
}

rsp := &types.MalachitePowerResponse{}
if err := json.Unmarshal(payload, rsp); err != nil {
return nil, fmt.Errorf("failed to unmarshal system compute stats raw data, err %s", err)
}

if rsp.Status != 0 {
return nil, fmt.Errorf("system compute stats status is not ok, %d", rsp.Status)
}

return &rsp.Data, nil
}

func (c *MalachiteClient) getRealtimePowerPayload(resource string) ([]byte, error) {
c.RLock()
defer c.RUnlock()

url, ok := c.urls[resource]
if !ok {
return nil, fmt.Errorf("no url for %v", resource)
}

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to http.NewRequest, url: %s, err %s", url, err)
}

rsp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to http.DefaultClient.Do, url: %s, err %s", req.URL, err)
}

defer func() { _ = rsp.Body.Close() }()
if rsp.StatusCode != 200 {
return nil, fmt.Errorf("invalid http response status code %d, url: %s", rsp.StatusCode, req.URL)
}

return ioutil.ReadAll(rsp.Body)
}
Loading