From 9ea0225dc170505ac5664d5f8252713bec1e19b4 Mon Sep 17 00:00:00 2001 From: "baojiangnan.bjn" Date: Tue, 2 Jan 2024 10:55:07 +0800 Subject: [PATCH] fix container metrics --- .../sysadvisor/inference/inference_plugin.go | 3 + pkg/consts/metric.go | 7 +- .../agent/metric/malachite/fetcher.go | 67 +++++++++++----- .../metric/malachite/fetcher_calculate.go | 76 +++++++++++++++++-- 4 files changed, 128 insertions(+), 25 deletions(-) diff --git a/cmd/katalyst-agent/app/options/sysadvisor/inference/inference_plugin.go b/cmd/katalyst-agent/app/options/sysadvisor/inference/inference_plugin.go index d95fb2c1b..85bc7b6d7 100644 --- a/cmd/katalyst-agent/app/options/sysadvisor/inference/inference_plugin.go +++ b/cmd/katalyst-agent/app/options/sysadvisor/inference/inference_plugin.go @@ -25,6 +25,9 @@ import ( ) const ( + // System Metrics Collector samples a time per 10s + // So it may produce two same inference result within per 10s. + // TODO So it's better to retrieve inference results and use them per 10s. defaultInferenceSyncPeriod = 5 * time.Second ) diff --git a/pkg/consts/metric.go b/pkg/consts/metric.go index c96f46654..8b8577b58 100644 --- a/pkg/consts/metric.go +++ b/pkg/consts/metric.go @@ -125,7 +125,7 @@ const ( MetricStoreInsContainer = "cpu.store.ins.container" MetricCPUNrThrottledRateContainer = MetricCPUNrThrottledContainer + Rate - MetricCPUNRdPeriodRateContainer = MetricCPUNrPeriodContainer + Rate + MetricCPUNrPeriodRateContainer = MetricCPUNrPeriodContainer + Rate MetricCPUThrottledTimeRateContainer = MetricCPUThrottledTimeContainer + Rate MetricCPUUpdateTimeContainer = "cpu.updatetime.container" @@ -174,6 +174,11 @@ const ( // container net metrics const ( + MetricNetTcpSendBytesContainer = "net.tcp.send.bytes.container" + MetricNetTcpSendPacketsContainer = "net.tcp.send.packets.container" + MetricNetTcpRecvBytesContainer = "net.tcp.recv.bytes.container" + MetricNetTcpRecvPacketsContainer = "net.tcp.recv.packets.container" + MetricNetTcpSendBPSContainer = "net.tcp.send.bps.container" MetricNetTcpSendPpsContainer = "net.tcp.send.pps.container" MetricNetTcpRecvBPSContainer = "net.tcp.recv.bps.container" diff --git a/pkg/metaserver/agent/metric/malachite/fetcher.go b/pkg/metaserver/agent/metric/malachite/fetcher.go index 476cb605a..634bb9236 100644 --- a/pkg/metaserver/agent/metric/malachite/fetcher.go +++ b/pkg/metaserver/agent/metric/malachite/fetcher.go @@ -48,6 +48,9 @@ const ( metricsNameMalachiteGetSystemStatusFailed = "malachite_get_system_status_failed" metricsNameMalachiteGetPodStatusFailed = "malachite_get_pod_status_failed" + // Typically, katalyst's metric component does sampling per 10s. + defaultMetricUpdateInterval = 10.0 + pageShift = 12 ) @@ -733,7 +736,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrPeriodContainer, utilmetric.MetricData{Value: float64(cpu.CPUNrPeriods), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUThrottledTimeContainer, - utilmetric.MetricData{Value: float64(cpu.CPUThrottledTime), Time: &updateTime}) + utilmetric.MetricData{Value: float64(cpu.CPUThrottledTime / 1000), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrRunnableContainer, utilmetric.MetricData{Value: float64(cpu.TaskNrRunning), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrUninterruptibleContainer, @@ -762,8 +765,14 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName utilmetric.MetricData{Value: float64(cpu.Cycles), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUInstructionsContainer, utilmetric.MetricData{Value: float64(cpu.Instructions), Time: &updateTime}) - m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, - utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime}) + // L3Misses is similar to OcrReadDrams + if cpu.L3Misses > 0 { + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, + utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime}) + } else if cpu.OcrReadDrams > 0 { + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, + utilmetric.MetricData{Value: float64(cpu.OcrReadDrams), Time: &updateTime}) + } if cyclesOld.Value > 0 && instructionsOld.Value > 0 { instructionDiff := float64(cpu.Instructions) - instructionsOld.Value @@ -792,6 +801,12 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName utilmetric.MetricData{Value: float64(cpu.TaskNrUninterruptible), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrIOWaitContainer, utilmetric.MetricData{Value: float64(cpu.TaskNrIoWait), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUThrottledTimeContainer, + utilmetric.MetricData{Value: float64(cpu.CPUStats.ThrottledUsec), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrThrottledContainer, + utilmetric.MetricData{Value: float64(cpu.CPUStats.NrThrottled), Time: &updateTime}) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrPeriodContainer, + utilmetric.MetricData{Value: float64(cpu.CPUStats.NrPeriods), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricLoad1MinContainer, utilmetric.MetricData{Value: cpu.Load.One, Time: &updateTime}) @@ -814,9 +829,14 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName utilmetric.MetricData{Value: float64(cpu.Cycles), Time: &updateTime}) m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUInstructionsContainer, utilmetric.MetricData{Value: float64(cpu.Instructions), Time: &updateTime}) - m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, - utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime}) - + // L3Misses is similar to OcrReadDrams + if cpu.L3Misses > 0 { + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, + utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime}) + } else if cpu.OcrReadDrams > 0 { + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer, + utilmetric.MetricData{Value: float64(cpu.OcrReadDrams), Time: &updateTime}) + } if cyclesOld.Value > 0 && instructionsOld.Value > 0 { instructionDiff := float64(cpu.Instructions) - instructionsOld.Value if instructionDiff > 0 { @@ -974,25 +994,34 @@ func (m *MalachiteMetricsFetcher) processContainerNetData(podUID, containerName net = cgStats.V2.NetCls updateTime = time.Unix(cgStats.V2.NetCls.UpdateTime, 0) } - if net == nil { return } lastUpdateTimeMetric, _ := m.metricStore.GetContainerMetric(podUID, containerName, consts.MetricNetworkUpdateTimeContainer) + m.processContainerNetRelevantRate(podUID, containerName, cgStats, lastUpdateTimeMetric.Value) - m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, func() float64 { - return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRxBytes, net.BpfNetData.NetTCPRxBytes)) - }, int64(lastUpdateTimeMetric.Value), updateTime.Unix()) - m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, func() float64 { - return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRxBytes, net.BpfNetData.NetTCPRxBytes)) - }, int64(lastUpdateTimeMetric.Value), updateTime.Unix()) - m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, func() float64 { - return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPTx, net.BpfNetData.NetTCPTx)) - }, int64(lastUpdateTimeMetric.Value), updateTime.Unix()) - m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, func() float64 { - return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRx, net.BpfNetData.NetTCPRx)) - }, int64(lastUpdateTimeMetric.Value), updateTime.Unix()) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvPacketsContainer, utilmetric.MetricData{ + Value: float64(net.BpfNetData.NetTCPRx), + Time: &updateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendPacketsContainer, utilmetric.MetricData{ + Value: float64(net.BpfNetData.NetTCPTx), + Time: &updateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvBytesContainer, utilmetric.MetricData{ + Value: float64(net.BpfNetData.NetTCPRxBytes), + Time: &updateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendBytesContainer, utilmetric.MetricData{ + Value: float64(net.BpfNetData.NetTCPTxBytes), + Time: &updateTime, + }) + + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetworkUpdateTimeContainer, utilmetric.MetricData{ + Value: float64(updateTime.Unix()), + Time: &updateTime, + }) } // Currently, these valid perf event data are provided through types.MalachiteCgroupInfo.V1/V2.CPU by malachite. diff --git a/pkg/metaserver/agent/metric/malachite/fetcher_calculate.go b/pkg/metaserver/agent/metric/malachite/fetcher_calculate.go index 24e8eb589..90c028325 100644 --- a/pkg/metaserver/agent/metric/malachite/fetcher_calculate.go +++ b/pkg/metaserver/agent/metric/malachite/fetcher_calculate.go @@ -100,7 +100,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai lastCPUIns = uint64(lastMetricValueFn(consts.MetricCPUInstructionsContainer)) lastCPUCycles = uint64(lastMetricValueFn(consts.MetricCPUCyclesContainer)) lastCPUNRTht = uint64(lastMetricValueFn(consts.MetricCPUNrThrottledContainer)) - lastCPUNRPeriod = uint64(lastMetricValueFn(consts.MetricCPUPeriodContainer)) + lastCPUNRPeriod = uint64(lastMetricValueFn(consts.MetricCPUNrPeriodContainer)) lastThrottleTime = uint64(lastMetricValueFn(consts.MetricCPUThrottledTimeContainer)) lastL3CacheMiss = uint64(lastMetricValueFn(consts.MetricCPUL3CacheMissContainer)) @@ -115,7 +115,11 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai curCPUNRTht = cgStats.V1.Cpu.CPUNrThrottled curCPUNRPeriod = cgStats.V1.Cpu.CPUNrPeriods curCPUThrottleTime = cgStats.V1.Cpu.CPUThrottledTime - curL3CacheMiss = cgStats.V1.Cpu.L3Misses + if cgStats.V1.Cpu.L3Misses > 0 { + curL3CacheMiss = cgStats.V1.Cpu.L3Misses + } else if cgStats.V1.Cpu.OcrReadDrams > 0 { + curL3CacheMiss = cgStats.V1.Cpu.OcrReadDrams + } curUpdateTime = cgStats.V1.Cpu.UpdateTime } else if cgStats.CgroupType == "V2" { curCPUIns = cgStats.V2.Cpu.Instructions @@ -123,12 +127,15 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai curCPUNRTht = cgStats.V2.Cpu.CPUStats.NrThrottled curCPUNRPeriod = cgStats.V2.Cpu.CPUStats.NrPeriods curCPUThrottleTime = cgStats.V2.Cpu.CPUStats.ThrottledUsec - curL3CacheMiss = cgStats.V2.Cpu.L3Misses + if cgStats.V2.Cpu.L3Misses > 0 { + curL3CacheMiss = cgStats.V2.Cpu.L3Misses + } else if cgStats.V2.Cpu.OcrReadDrams > 0 { + curL3CacheMiss = cgStats.V2.Cpu.OcrReadDrams + } curUpdateTime = cgStats.V2.Cpu.UpdateTime } else { return } - m.setContainerRateMetric(podUID, containerName, consts.MetricCPUInstructionsRateContainer, func() float64 { return float64(uint64CounterDelta(lastCPUIns, curCPUIns)) }, int64(lastUpdateTimeInSec), curUpdateTime) @@ -138,7 +145,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNrThrottledRateContainer, func() float64 { return float64(uint64CounterDelta(lastCPUNRTht, curCPUNRTht)) }, int64(lastUpdateTimeInSec), curUpdateTime) - m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNRdPeriodRateContainer, func() float64 { + m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNrPeriodRateContainer, func() float64 { return float64(uint64CounterDelta(lastCPUNRPeriod, curCPUNRPeriod)) }, int64(lastUpdateTimeInSec), curUpdateTime) m.setContainerRateMetric(podUID, containerName, consts.MetricCPUThrottledTimeRateContainer, func() float64 { @@ -190,6 +197,65 @@ func (m *MalachiteMetricsFetcher) processContainerMemRelevantRate(podUID, contai }, int64(lastUpdateTimeInSec), curUpdateTime) } +func (m *MalachiteMetricsFetcher) processContainerNetRelevantRate(podUID, containerName string, cgStats *types.MalachiteCgroupInfo, lastUpdateTimeInSec float64) { + lastMetricValueFn := func(metricName string) float64 { + lastMetric, _ := m.metricStore.GetContainerMetric(podUID, containerName, metricName) + return lastMetric.Value + } + + var ( + lastNetTCPRx = uint64(lastMetricValueFn(consts.MetricNetTcpRecvPacketsContainer)) + lastNetTCPTx = uint64(lastMetricValueFn(consts.MetricNetTcpSendPacketsContainer)) + lastNetTCPRxBytes = uint64(lastMetricValueFn(consts.MetricNetTcpRecvBytesContainer)) + lastNetTCPTxBytes = uint64(lastMetricValueFn(consts.MetricNetTcpSendBytesContainer)) + + netData *types.NetClsCgData + ) + + if cgStats.V1 != nil { + netData = cgStats.V1.NetCls + } else if cgStats.V2 != nil { + netData = cgStats.V2.NetCls + } else { + return + } + + curUpdateTime := netData.UpdateTime + _curUpdateTime := time.Unix(curUpdateTime, 0) + updateTimeDiff := float64(curUpdateTime) - lastUpdateTimeInSec + if updateTimeDiff > 0 { + m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, func() float64 { + return float64(uint64CounterDelta(lastNetTCPTxBytes, netData.BpfNetData.NetTCPTxBytes)) + }, int64(lastUpdateTimeInSec), curUpdateTime) + m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, func() float64 { + return float64(uint64CounterDelta(lastNetTCPRxBytes, netData.BpfNetData.NetTCPRxBytes)) + }, int64(lastUpdateTimeInSec), curUpdateTime) + m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, func() float64 { + return float64(uint64CounterDelta(lastNetTCPTx, netData.BpfNetData.NetTCPTx)) + }, int64(lastUpdateTimeInSec), curUpdateTime) + m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, func() float64 { + return float64(uint64CounterDelta(lastNetTCPRx, netData.BpfNetData.NetTCPRx)) + }, int64(lastUpdateTimeInSec), curUpdateTime) + } else { + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, metric.MetricData{ + Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPTxBytes, netData.BpfNetData.NetTCPTxBytes)) / defaultMetricUpdateInterval, + Time: &_curUpdateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, metric.MetricData{ + Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPRxBytes, netData.BpfNetData.NetTCPRxBytes)) / defaultMetricUpdateInterval, + Time: &_curUpdateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, metric.MetricData{ + Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPTx, netData.BpfNetData.NetTCPTx)) / defaultMetricUpdateInterval, + Time: &_curUpdateTime, + }) + m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, metric.MetricData{ + Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPRx, netData.BpfNetData.NetTCPRx)) / defaultMetricUpdateInterval, + Time: &_curUpdateTime, + }) + } +} + // setContainerRateMetric is used to set rate metric in container level. // This method will check if the metric is really updated, and decide weather to update metric in metricStore. // The method could help avoid lots of meaningless "zero" value.