Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix container metrics #428

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
)

const (
// System Metrics Collector samples a time per 10s
// So it may produce two same inference result within per 10s.
// TODO So it's better to retrieve inference results and use them per 10s.
defaultInferenceSyncPeriod = 5 * time.Second
)

Expand Down
7 changes: 6 additions & 1 deletion pkg/consts/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ const (
MetricStoreInsContainer = "cpu.store.ins.container"

MetricCPUNrThrottledRateContainer = MetricCPUNrThrottledContainer + Rate
MetricCPUNRdPeriodRateContainer = MetricCPUNrPeriodContainer + Rate
MetricCPUNrPeriodRateContainer = MetricCPUNrPeriodContainer + Rate
MetricCPUThrottledTimeRateContainer = MetricCPUThrottledTimeContainer + Rate

MetricCPUUpdateTimeContainer = "cpu.updatetime.container"
Expand Down Expand Up @@ -174,6 +174,11 @@ const (

// container net metrics
const (
MetricNetTcpSendBytesContainer = "net.tcp.send.bytes.container"
MetricNetTcpSendPacketsContainer = "net.tcp.send.packets.container"
MetricNetTcpRecvBytesContainer = "net.tcp.recv.bytes.container"
MetricNetTcpRecvPacketsContainer = "net.tcp.recv.packets.container"

MetricNetTcpSendBPSContainer = "net.tcp.send.bps.container"
MetricNetTcpSendPpsContainer = "net.tcp.send.pps.container"
MetricNetTcpRecvBPSContainer = "net.tcp.recv.bps.container"
Expand Down
67 changes: 48 additions & 19 deletions pkg/metaserver/agent/metric/malachite/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
metricsNameMalachiteGetSystemStatusFailed = "malachite_get_system_status_failed"
metricsNameMalachiteGetPodStatusFailed = "malachite_get_pod_status_failed"

// Typically, katalyst's metric component does sampling per 10s.
defaultMetricUpdateInterval = 10.0

pageShift = 12
)

Expand Down Expand Up @@ -733,7 +736,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrPeriodContainer,
utilmetric.MetricData{Value: float64(cpu.CPUNrPeriods), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUThrottledTimeContainer,
utilmetric.MetricData{Value: float64(cpu.CPUThrottledTime), Time: &updateTime})
utilmetric.MetricData{Value: float64(cpu.CPUThrottledTime / 1000), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrRunnableContainer,
utilmetric.MetricData{Value: float64(cpu.TaskNrRunning), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrUninterruptibleContainer,
Expand Down Expand Up @@ -762,8 +765,14 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName
utilmetric.MetricData{Value: float64(cpu.Cycles), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUInstructionsContainer,
utilmetric.MetricData{Value: float64(cpu.Instructions), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime})
// L3Misses is similar to OcrReadDrams
if cpu.L3Misses > 0 {
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime})
} else if cpu.OcrReadDrams > 0 {
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.OcrReadDrams), Time: &updateTime})
}

if cyclesOld.Value > 0 && instructionsOld.Value > 0 {
instructionDiff := float64(cpu.Instructions) - instructionsOld.Value
Expand Down Expand Up @@ -792,6 +801,12 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName
utilmetric.MetricData{Value: float64(cpu.TaskNrUninterruptible), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrIOWaitContainer,
utilmetric.MetricData{Value: float64(cpu.TaskNrIoWait), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUThrottledTimeContainer,
utilmetric.MetricData{Value: float64(cpu.CPUStats.ThrottledUsec), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrThrottledContainer,
utilmetric.MetricData{Value: float64(cpu.CPUStats.NrThrottled), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUNrPeriodContainer,
utilmetric.MetricData{Value: float64(cpu.CPUStats.NrPeriods), Time: &updateTime})

m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricLoad1MinContainer,
utilmetric.MetricData{Value: cpu.Load.One, Time: &updateTime})
Expand All @@ -814,9 +829,14 @@ func (m *MalachiteMetricsFetcher) processContainerCPUData(podUID, containerName
utilmetric.MetricData{Value: float64(cpu.Cycles), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUInstructionsContainer,
utilmetric.MetricData{Value: float64(cpu.Instructions), Time: &updateTime})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime})

// L3Misses is similar to OcrReadDrams
if cpu.L3Misses > 0 {
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.L3Misses), Time: &updateTime})
} else if cpu.OcrReadDrams > 0 {
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricCPUL3CacheMissContainer,
utilmetric.MetricData{Value: float64(cpu.OcrReadDrams), Time: &updateTime})
}
if cyclesOld.Value > 0 && instructionsOld.Value > 0 {
instructionDiff := float64(cpu.Instructions) - instructionsOld.Value
if instructionDiff > 0 {
Expand Down Expand Up @@ -974,25 +994,34 @@ func (m *MalachiteMetricsFetcher) processContainerNetData(podUID, containerName
net = cgStats.V2.NetCls
updateTime = time.Unix(cgStats.V2.NetCls.UpdateTime, 0)
}

if net == nil {
return
}

lastUpdateTimeMetric, _ := m.metricStore.GetContainerMetric(podUID, containerName, consts.MetricNetworkUpdateTimeContainer)
m.processContainerNetRelevantRate(podUID, containerName, cgStats, lastUpdateTimeMetric.Value)

m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, func() float64 {
return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRxBytes, net.BpfNetData.NetTCPRxBytes))
}, int64(lastUpdateTimeMetric.Value), updateTime.Unix())
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, func() float64 {
return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRxBytes, net.BpfNetData.NetTCPRxBytes))
}, int64(lastUpdateTimeMetric.Value), updateTime.Unix())
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, func() float64 {
return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPTx, net.BpfNetData.NetTCPTx))
}, int64(lastUpdateTimeMetric.Value), updateTime.Unix())
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, func() float64 {
return float64(uint64CounterDelta(net.OldBpfNetData.NetTCPRx, net.BpfNetData.NetTCPRx))
}, int64(lastUpdateTimeMetric.Value), updateTime.Unix())
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvPacketsContainer, utilmetric.MetricData{
Value: float64(net.BpfNetData.NetTCPRx),
Time: &updateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendPacketsContainer, utilmetric.MetricData{
Value: float64(net.BpfNetData.NetTCPTx),
Time: &updateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvBytesContainer, utilmetric.MetricData{
Value: float64(net.BpfNetData.NetTCPRxBytes),
Time: &updateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendBytesContainer, utilmetric.MetricData{
Value: float64(net.BpfNetData.NetTCPTxBytes),
Time: &updateTime,
})

m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetworkUpdateTimeContainer, utilmetric.MetricData{
Value: float64(updateTime.Unix()),
Time: &updateTime,
})
}

// Currently, these valid perf event data are provided through types.MalachiteCgroupInfo.V1/V2.CPU by malachite.
Expand Down
76 changes: 71 additions & 5 deletions pkg/metaserver/agent/metric/malachite/fetcher_calculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai
lastCPUIns = uint64(lastMetricValueFn(consts.MetricCPUInstructionsContainer))
lastCPUCycles = uint64(lastMetricValueFn(consts.MetricCPUCyclesContainer))
lastCPUNRTht = uint64(lastMetricValueFn(consts.MetricCPUNrThrottledContainer))
lastCPUNRPeriod = uint64(lastMetricValueFn(consts.MetricCPUPeriodContainer))
lastCPUNRPeriod = uint64(lastMetricValueFn(consts.MetricCPUNrPeriodContainer))
lastThrottleTime = uint64(lastMetricValueFn(consts.MetricCPUThrottledTimeContainer))
lastL3CacheMiss = uint64(lastMetricValueFn(consts.MetricCPUL3CacheMissContainer))

Expand All @@ -115,20 +115,27 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai
curCPUNRTht = cgStats.V1.Cpu.CPUNrThrottled
curCPUNRPeriod = cgStats.V1.Cpu.CPUNrPeriods
curCPUThrottleTime = cgStats.V1.Cpu.CPUThrottledTime
curL3CacheMiss = cgStats.V1.Cpu.L3Misses
if cgStats.V1.Cpu.L3Misses > 0 {
curL3CacheMiss = cgStats.V1.Cpu.L3Misses
} else if cgStats.V1.Cpu.OcrReadDrams > 0 {
curL3CacheMiss = cgStats.V1.Cpu.OcrReadDrams
}
curUpdateTime = cgStats.V1.Cpu.UpdateTime
} else if cgStats.CgroupType == "V2" {
curCPUIns = cgStats.V2.Cpu.Instructions
curCPUCycles = cgStats.V2.Cpu.Cycles
curCPUNRTht = cgStats.V2.Cpu.CPUStats.NrThrottled
curCPUNRPeriod = cgStats.V2.Cpu.CPUStats.NrPeriods
curCPUThrottleTime = cgStats.V2.Cpu.CPUStats.ThrottledUsec
curL3CacheMiss = cgStats.V2.Cpu.L3Misses
if cgStats.V2.Cpu.L3Misses > 0 {
curL3CacheMiss = cgStats.V2.Cpu.L3Misses
} else if cgStats.V2.Cpu.OcrReadDrams > 0 {
curL3CacheMiss = cgStats.V2.Cpu.OcrReadDrams
}
curUpdateTime = cgStats.V2.Cpu.UpdateTime
} else {
return
}

m.setContainerRateMetric(podUID, containerName, consts.MetricCPUInstructionsRateContainer, func() float64 {
return float64(uint64CounterDelta(lastCPUIns, curCPUIns))
}, int64(lastUpdateTimeInSec), curUpdateTime)
Expand All @@ -138,7 +145,7 @@ func (m *MalachiteMetricsFetcher) processContainerCPURelevantRate(podUID, contai
m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNrThrottledRateContainer, func() float64 {
return float64(uint64CounterDelta(lastCPUNRTht, curCPUNRTht))
}, int64(lastUpdateTimeInSec), curUpdateTime)
m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNRdPeriodRateContainer, func() float64 {
m.setContainerRateMetric(podUID, containerName, consts.MetricCPUNrPeriodRateContainer, func() float64 {
return float64(uint64CounterDelta(lastCPUNRPeriod, curCPUNRPeriod))
}, int64(lastUpdateTimeInSec), curUpdateTime)
m.setContainerRateMetric(podUID, containerName, consts.MetricCPUThrottledTimeRateContainer, func() float64 {
Expand Down Expand Up @@ -190,6 +197,65 @@ func (m *MalachiteMetricsFetcher) processContainerMemRelevantRate(podUID, contai
}, int64(lastUpdateTimeInSec), curUpdateTime)
}

func (m *MalachiteMetricsFetcher) processContainerNetRelevantRate(podUID, containerName string, cgStats *types.MalachiteCgroupInfo, lastUpdateTimeInSec float64) {
lastMetricValueFn := func(metricName string) float64 {
lastMetric, _ := m.metricStore.GetContainerMetric(podUID, containerName, metricName)
return lastMetric.Value
}

var (
lastNetTCPRx = uint64(lastMetricValueFn(consts.MetricNetTcpRecvPacketsContainer))
lastNetTCPTx = uint64(lastMetricValueFn(consts.MetricNetTcpSendPacketsContainer))
lastNetTCPRxBytes = uint64(lastMetricValueFn(consts.MetricNetTcpRecvBytesContainer))
lastNetTCPTxBytes = uint64(lastMetricValueFn(consts.MetricNetTcpSendBytesContainer))

netData *types.NetClsCgData
)

if cgStats.V1 != nil {
netData = cgStats.V1.NetCls
} else if cgStats.V2 != nil {
netData = cgStats.V2.NetCls
} else {
return
}

curUpdateTime := netData.UpdateTime
_curUpdateTime := time.Unix(curUpdateTime, 0)
updateTimeDiff := float64(curUpdateTime) - lastUpdateTimeInSec
if updateTimeDiff > 0 {
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, func() float64 {
return float64(uint64CounterDelta(lastNetTCPTxBytes, netData.BpfNetData.NetTCPTxBytes))
}, int64(lastUpdateTimeInSec), curUpdateTime)
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, func() float64 {
return float64(uint64CounterDelta(lastNetTCPRxBytes, netData.BpfNetData.NetTCPRxBytes))
}, int64(lastUpdateTimeInSec), curUpdateTime)
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, func() float64 {
return float64(uint64CounterDelta(lastNetTCPTx, netData.BpfNetData.NetTCPTx))
}, int64(lastUpdateTimeInSec), curUpdateTime)
m.setContainerRateMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, func() float64 {
return float64(uint64CounterDelta(lastNetTCPRx, netData.BpfNetData.NetTCPRx))
}, int64(lastUpdateTimeInSec), curUpdateTime)
} else {
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendBPSContainer, metric.MetricData{
Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPTxBytes, netData.BpfNetData.NetTCPTxBytes)) / defaultMetricUpdateInterval,
Time: &_curUpdateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvBPSContainer, metric.MetricData{
Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPRxBytes, netData.BpfNetData.NetTCPRxBytes)) / defaultMetricUpdateInterval,
Time: &_curUpdateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpSendPpsContainer, metric.MetricData{
Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPTx, netData.BpfNetData.NetTCPTx)) / defaultMetricUpdateInterval,
Time: &_curUpdateTime,
})
m.metricStore.SetContainerMetric(podUID, containerName, consts.MetricNetTcpRecvPpsContainer, metric.MetricData{
Value: float64(uint64CounterDelta(netData.OldBpfNetData.NetTCPRx, netData.BpfNetData.NetTCPRx)) / defaultMetricUpdateInterval,
Time: &_curUpdateTime,
})
}
}

// setContainerRateMetric is used to set rate metric in container level.
// This method will check if the metric is really updated, and decide weather to update metric in metricStore.
// The method could help avoid lots of meaningless "zero" value.
Expand Down