Skip to content

Commit

Permalink
supplement metric for rss, cache, cpu-usage and expand the ability fo…
Browse files Browse the repository at this point in the history
…r external metric (#131)
  • Loading branch information
waynepeking348 authored Jul 6, 2023
1 parent 5424c58 commit fa3b08b
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 48 deletions.
8 changes: 8 additions & 0 deletions pkg/consts/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package consts

import "math"

const (
// OwnerReferenceIndex is the lookup name for the index function
OwnerReferenceIndex = "owner-reference-index"
Expand Down Expand Up @@ -58,3 +60,9 @@ const (
ObjectFieldNameSpec = "spec"
ObjectFieldNameStatus = "status"
)

var (
EXP1 = 1.0 / math.Exp(5.0/60.0)
EXP5 = 1.0 / math.Exp(5.0/300.0)
EXP15 = 1.0 / math.Exp(5.0/900.0)
)
9 changes: 4 additions & 5 deletions pkg/consts/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ const (

// Cgroup cpu metrics
const (
MetricCPULimitContainer = "cpu.limit.container"
MetricCPUUsageContainer = "cpu.usage.container"
MetricCPUUsageRatioContainer = "cpu.usage.ratio.container"
MetricCPUUsageUserContainer = "cpu.usage.user.container"
MetricCPUUsageSysContainer = "cpu.usage.sys.container"
MetricCPULimitContainer = "cpu.limit.container"
MetricCPUUsageContainer = "cpu.usage.container"
MetricCPUUsageUserContainer = "cpu.usage.user.container"
MetricCPUUsageSysContainer = "cpu.usage.sys.container"

MetricCPUShareContainer = "cpu.share.container"
MetricCPUQuotaContainer = "cpu.quota.container"
Expand Down
2 changes: 2 additions & 0 deletions pkg/metaserver/agent/metric/fake_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (f *FakeMetricsFetcher) RegisterNotifier(scope MetricsScope, req NotifiedRe

func (f *FakeMetricsFetcher) DeRegisterNotifier(scope MetricsScope, key string) {}

func (f *FakeMetricsFetcher) RegisterExternalMetric(_ func(store *metric.MetricStore)) {}

func (f *FakeMetricsFetcher) GetNodeMetric(metricName string) (metric.MetricData, error) {
return f.metricStore.GetNodeMetric(metricName)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/metaserver/agent/metric/malachite/cgroup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ type CPUBasicInfo struct {
UpdateTime uint64 `json:"update_time"`
}

// CPUCgDataV1
// for legacy reasons, those all represent cores rather than ratio, we need to transform in collector,
// and it will take effect for metric including: cpu_usage_ratio, cpu_user_usage_ratio, cpu_sys_usage_ratio
type CPUCgDataV1 struct {
FullPath string `json:"full_path"`
CfsQuotaUs int64 `json:"cfs_quota_us"`
Expand Down Expand Up @@ -315,14 +318,17 @@ type BlkIOCgDataV2 struct {
UpdateTime int64 `json:"update_time"`
}

// CPUCgDataV2
// for legacy reasons, those all represent cores rather than ratio, we need to transform in collector,
// and it will take effect for metric including: cpu_usage_ratio, cpu_user_usage_ratio, cpu_sys_usage_ratio
type CPUCgDataV2 struct {
FullPath string `json:"full_path"`
CPUStats CPUStats `json:"cpu_stats"`
CPUPressure system.Pressure `json:"cpu_pressure"`
Weight int `json:"weight"`
WeightNice int `json:"weight_nice"`
MaxBurst int `json:"max_burst"`
Max uint64 `json:"max"` //18446744073709551615(u64_max) means unlimited
Max uint64 `json:"max"` // 18446744073709551615(u64_max) means unlimited
MaxPeriod int64 `json:"max_period"`
CPUUsageRatio float64 `json:"cpu_usage_ratio"`
CPUUserUsageRatio float64 `json:"cpu_user_usage_ratio"`
Expand Down
14 changes: 9 additions & 5 deletions pkg/metaserver/agent/metric/malachite/system/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type DiskIo struct {
}

type SystemDiskIoData struct {
DiskIo []DiskIo `json:"disk_io"`
DiskIo []DiskIo `json:"disk_io"`
UpdateTime int64 `json:"update_time"`
}

type MalachiteSystemNetworkResponse struct {
Expand All @@ -42,6 +43,7 @@ type MalachiteSystemNetworkResponse struct {
type SystemNetworkData struct {
NetworkCard []NetworkCard `json:"networkcard"`
TCP TCP `json:"tcp"`
UpdateTime int64 `json:"update_time"`
}

type NetworkCard struct {
Expand Down Expand Up @@ -84,8 +86,9 @@ type MalachiteSystemComputeResponse struct {
}

type SystemComputeData struct {
Load Load `json:"load"`
CPU []CPU `json:"cpu"`
Load Load `json:"load"`
CPU []CPU `json:"cpu"`
UpdateTime int64 `json:"update_time"`
}

type Load struct {
Expand Down Expand Up @@ -116,8 +119,9 @@ type MalachiteSystemMemoryResponse struct {
}

type SystemMemoryData struct {
System System `json:"system"`
Numa []Numa `json:"numa"`
System System `json:"system"`
Numa []Numa `json:"numa"`
UpdateTime int64 `json:"update_time"`
}

type System struct {
Expand Down
82 changes: 45 additions & 37 deletions pkg/metaserver/agent/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type MetricsFetcher interface {
RegisterNotifier(scope MetricsScope, req NotifiedRequest, response chan NotifiedResponse) string
DeRegisterNotifier(scope MetricsScope, key string)

// RegisterExternalMetric register a function to set metric that can
// only be obtained from external sources
RegisterExternalMetric(f func(store *metric.MetricStore))

// GetNodeMetric get metric of node.
GetNodeMetric(metricName string) (metric.MetricData, error)
// GetNumaMetric get metric of numa.
Expand Down Expand Up @@ -123,7 +127,7 @@ func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter) MetricsFetcher {
return &MalachiteMetricsFetcher{
metricStore: metric.GetMetricStoreInstance(),
emitter: emitter,
registered: map[MetricsScope]map[string]NotifiedData{
registeredNotifier: map[MetricsScope]map[string]NotifiedData{
MetricsScopeNode: make(map[string]NotifiedData),
MetricsScopeNuma: make(map[string]NotifiedData),
MetricsScopeCPU: make(map[string]NotifiedData),
Expand All @@ -135,10 +139,12 @@ func NewMalachiteMetricsFetcher(emitter metrics.MetricEmitter) MetricsFetcher {

type MalachiteMetricsFetcher struct {
metricStore *metric.MetricStore
emitter metrics.MetricEmitter

registeredMetric []func(store *metric.MetricStore)
registeredNotifier map[MetricsScope]map[string]NotifiedData

sync.RWMutex
registered map[MetricsScope]map[string]NotifiedData
emitter metrics.MetricEmitter
}

func (m *MalachiteMetricsFetcher) Run(ctx context.Context) {
Expand All @@ -147,8 +153,9 @@ func (m *MalachiteMetricsFetcher) Run(ctx context.Context) {
})
}

func (m *MalachiteMetricsFetcher) RegisterNotifier(scope MetricsScope, req NotifiedRequest, response chan NotifiedResponse) string {
if _, ok := m.registered[scope]; !ok {
func (m *MalachiteMetricsFetcher) RegisterNotifier(scope MetricsScope, req NotifiedRequest,
response chan NotifiedResponse) string {
if _, ok := m.registeredNotifier[scope]; !ok {
return ""
}

Expand All @@ -159,7 +166,7 @@ func (m *MalachiteMetricsFetcher) RegisterNotifier(scope MetricsScope, req Notif
rand.Read(randBytes)
key := string(randBytes)

m.registered[scope][key] = NotifiedData{
m.registeredNotifier[scope][key] = NotifiedData{
scope: scope,
req: req,
response: response,
Expand All @@ -171,7 +178,11 @@ func (m *MalachiteMetricsFetcher) DeRegisterNotifier(scope MetricsScope, key str
m.Lock()
defer m.Unlock()

delete(m.registered[scope], key)
delete(m.registeredNotifier[scope], key)
}

func (m *MalachiteMetricsFetcher) RegisterExternalMetric(f func(store *metric.MetricStore)) {
m.registeredMetric = append(m.registeredMetric, f)
}

func (m *MalachiteMetricsFetcher) GetNodeMetric(metricName string) (metric.MetricData, error) {
Expand Down Expand Up @@ -221,9 +232,15 @@ func (m *MalachiteMetricsFetcher) sample() {

// Update system data
m.updateSystemStats()

// Update pod data
m.updatePodsCgroupData()

// after sampling, we should call the registered function to get external metric
for _, f := range m.registeredMetric {
f(m.metricStore)
}
m.notifySystem()
m.notifyPods()
}

// checkMalachiteHealthy is to check whether malachite is healthy
Expand Down Expand Up @@ -268,8 +285,6 @@ func (m *MalachiteMetricsFetcher) updateSystemStats() {
} else {
m.processSystemIOData(systemIOData)
}

m.notifySystem()
}

// Get raw cgroup data by malachite sdk and set container metrics to metricStore, GC not existed pod metrics
Expand All @@ -293,8 +308,6 @@ func (m *MalachiteMetricsFetcher) updatePodsCgroupData() {
}
}
m.metricStore.GCPodsMetric(podUIDSet)

m.notifyPods()
}

// notifySystem notifies system-related data
Expand All @@ -303,7 +316,7 @@ func (m *MalachiteMetricsFetcher) notifySystem() {
m.RLock()
defer m.RUnlock()

for _, reg := range m.registered[MetricsScopeNode] {
for _, reg := range m.registeredNotifier[MetricsScopeNode] {
v, err := m.metricStore.GetNodeMetric(reg.req.MetricName)
if err != nil {
continue
Expand All @@ -316,7 +329,7 @@ func (m *MalachiteMetricsFetcher) notifySystem() {
}
}

for _, reg := range m.registered[MetricsScopeDevice] {
for _, reg := range m.registeredNotifier[MetricsScopeDevice] {
v, err := m.metricStore.GetDeviceMetric(reg.req.DeviceID, reg.req.MetricName)
if err != nil {
continue
Expand All @@ -329,7 +342,7 @@ func (m *MalachiteMetricsFetcher) notifySystem() {
}
}

for _, reg := range m.registered[MetricsScopeNuma] {
for _, reg := range m.registeredNotifier[MetricsScopeNuma] {
v, err := m.metricStore.GetNumaMetric(reg.req.NumaID, reg.req.MetricName)
if err != nil {
continue
Expand All @@ -342,7 +355,7 @@ func (m *MalachiteMetricsFetcher) notifySystem() {
}
}

for _, reg := range m.registered[MetricsScopeCPU] {
for _, reg := range m.registeredNotifier[MetricsScopeCPU] {
v, err := m.metricStore.GetCPUMetric(reg.req.CoreID, reg.req.MetricName)
if err != nil {
continue
Expand All @@ -362,7 +375,7 @@ func (m *MalachiteMetricsFetcher) notifyPods() {
m.RLock()
defer m.RUnlock()

for _, reg := range m.registered[MetricsScopeContainer] {
for _, reg := range m.registeredNotifier[MetricsScopeContainer] {
v, err := m.metricStore.GetContainerMetric(reg.req.PodUID, reg.req.ContainerName, reg.req.MetricName)
if err != nil {
continue
Expand Down Expand Up @@ -392,8 +405,8 @@ func (m *MalachiteMetricsFetcher) notifyPods() {
}

func (m *MalachiteMetricsFetcher) processSystemComputeData(systemComputeData *system.SystemComputeData) {
// todo, currently there exits no updateTime, so we will use current-time instead
updateTime := time.Now()
// todo, currently we only get a unified data for the whole system compute data
updateTime := time.Unix(systemComputeData.UpdateTime, 0)

load := systemComputeData.Load
m.metricStore.SetNodeMetric(consts.MetricLoad1MinSystem,
Expand All @@ -405,8 +418,8 @@ func (m *MalachiteMetricsFetcher) processSystemComputeData(systemComputeData *sy
}

func (m *MalachiteMetricsFetcher) processSystemMemoryData(systemMemoryData *system.SystemMemoryData) {
// todo, currently there exits no updateTime, so we will use current-time instead
updateTime := time.Now()
// todo, currently we only get a unified data for the whole system memory data
updateTime := time.Unix(systemMemoryData.UpdateTime, 0)

mem := systemMemoryData.System
m.metricStore.SetNodeMetric(consts.MetricMemTotalSystem,
Expand Down Expand Up @@ -441,8 +454,8 @@ func (m *MalachiteMetricsFetcher) processSystemMemoryData(systemMemoryData *syst
}

func (m *MalachiteMetricsFetcher) processSystemIOData(systemIOData *system.SystemDiskIoData) {
// todo, currently there exits no updateTime, so we will use current-time instead
updateTime := time.Now()
// todo, currently we only get a unified data for the whole system io data
updateTime := time.Unix(systemIOData.UpdateTime, 0)

for _, device := range systemIOData.DiskIo {
m.metricStore.SetDeviceMetric(device.DeviceName, consts.MetricIOReadSystem,
Expand All @@ -455,8 +468,8 @@ func (m *MalachiteMetricsFetcher) processSystemIOData(systemIOData *system.Syste
}

func (m *MalachiteMetricsFetcher) processSystemNumaData(systemMemoryData *system.SystemMemoryData) {
// todo, currently there exits no updateTime, so we will use current-time instead
updateTime := time.Now()
// todo, currently we only get a unified data for the whole system memory data
updateTime := time.Unix(systemMemoryData.UpdateTime, 0)

for _, numa := range systemMemoryData.Numa {
m.metricStore.SetNumaMetric(numa.ID, consts.MetricMemTotalNuma,
Expand Down Expand Up @@ -491,8 +504,8 @@ func (m *MalachiteMetricsFetcher) processSystemNumaData(systemMemoryData *system
}

func (m *MalachiteMetricsFetcher) processSystemCPUComputeData(systemComputeData *system.SystemComputeData) {
// todo, currently there exits no updateTime, so we will use current-time instead
updateTime := time.Now()
// todo, currently we only get a unified data for the whole system compute data
updateTime := time.Unix(systemComputeData.UpdateTime, 0)

for _, cpu := range systemComputeData.CPU {
cpuID, err := strconv.Atoi(cpu.Name[3:])
Expand Down Expand Up @@ -522,8 +535,6 @@ func (m *MalachiteMetricsFetcher) processCgroupCPUData(podUID, containerName str
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPULimitContainer,
metric.MetricData{Value: float64(cpu.CfsQuotaUs) / float64(cpu.CfsPeriodUs), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageContainer,
metric.MetricData{Value: float64(cpu.NewCPUBasicInfo.CPUUsage-cpu.OldCPUBasicInfo.CPUUsage) / (float64(cpu.NewCPUBasicInfo.UpdateTime-cpu.OldCPUBasicInfo.UpdateTime) * 1e9), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageRatioContainer,
metric.MetricData{Value: cpu.CPUUsageRatio, Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageUserContainer,
metric.MetricData{Value: cpu.CPUUserUsageRatio, Time: &updateTime})
Expand Down Expand Up @@ -572,13 +583,6 @@ func (m *MalachiteMetricsFetcher) processCgroupCPUData(podUID, containerName str
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUInstructionsContainer,
metric.MetricData{Value: float64(cpu.Instructions), Time: &updateTime})

updateTimeDiff := cpu.NewCPUBasicInfo.UpdateTime - cpu.OldCPUBasicInfo.UpdateTime
if updateTimeDiff > 0 {
usage := float64(cpu.NewCPUBasicInfo.CPUUsage-cpu.OldCPUBasicInfo.CPUUsage) / (float64(updateTimeDiff) * 1e9)
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageContainer,
metric.MetricData{Value: usage, Time: &updateTime})
}

if cyclesOld.Value > 0 && instructionsOld.Value > 0 {
instructionDiff := float64(cpu.Instructions) - instructionsOld.Value
if instructionDiff > 0 {
Expand All @@ -592,7 +596,7 @@ func (m *MalachiteMetricsFetcher) processCgroupCPUData(podUID, containerName str
cpu := cgStats.V2.Cpu
updateTime := time.Unix(cgStats.V2.Cpu.UpdateTime, 0)

m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageRatioContainer,
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageContainer,
metric.MetricData{Value: cpu.CPUUsageRatio, Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUUsageUserContainer,
metric.MetricData{Value: cpu.CPUUserUsageRatio, Time: &updateTime})
Expand Down Expand Up @@ -682,6 +686,10 @@ func (m *MalachiteMetricsFetcher) processCgroupMemoryData(podUID, containerName

m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemUsageContainer,
metric.MetricData{Value: float64(mem.MemoryUsageInBytes), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemRssContainer,
metric.MetricData{Value: float64(mem.MemStats.Anon), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemCacheContainer,
metric.MetricData{Value: float64(mem.MemStats.File), Time: &updateTime})

m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricMemShmemContainer,
metric.MetricData{Value: float64(mem.MemStats.Shmem), Time: &updateTime})
Expand Down
2 changes: 2 additions & 0 deletions pkg/metaserver/agent/metric/metric_calculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

// processContainerMemBandwidth handles memory bandwidth (read/write) rate in a period while,
// and it will need the previously collected datat to do this
func (m *MalachiteMetricsFetcher) processContainerMemBandwidth(podUID, containerName string, cgStats *cgroup.MalachiteCgroupInfo) {
var (
lastOCRReadDRAMs, _ = m.metricStore.GetContainerMetric(podUID, containerName, consts.MetricOCRReadDRAMsContainer)
Expand Down

0 comments on commit fa3b08b

Please sign in to comment.