Skip to content

Commit

Permalink
Update hostmetrics load scraper to use new perfcounters package (#1868)
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington authored Sep 28, 2020
1 parent 40544e7 commit 8f265cc
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package perfcounters

import (
"fmt"

"go.opentelemetry.io/collector/internal/processor/filterset"
)

Expand Down Expand Up @@ -76,3 +78,90 @@ func (obj mockPerfDataObjectError) Filter(includeFS, excludeFS filterset.FilterS
func (obj mockPerfDataObjectError) GetValues(counterNames ...string) ([]*CounterValues, error) {
return nil, obj.getValuesErr
}

// MockPerfCounterScraper is an implementation of PerfCounterScraper that returns the supplied
// object / counter values on each successive call to Scrape, in the specified order.
//
// Example Usage:
//
// s := NewMockPerfCounterScraper(map[string]map[string][]int64{
// "Object1": map[string][]int64{
// "Counter1": []int64{1, 2},
// "Counter2": []int64{4},
// },
// })
//
// s.Scrape().GetObject("Object1").GetValues("Counter1", "Counter2")
//
// ... 1st call returns []*CounterValues{ { Values: { "Counter1": 1, "Counter2": 4 } } }
// ... 2nd call returns []*CounterValues{ { Values: { "Counter1": 2, "Counter2": 4 } } }
type MockPerfCounterScraper struct {
objectsAndValuesToReturn map[string]map[string][]int64
timesCalled int
}

// NewMockPerfCounterScraper returns a MockPerfCounterScraper that will return the supplied
// object / counter values on each successive call to Scrape, in the specified order.
func NewMockPerfCounterScraper(objectsAndValuesToReturn map[string]map[string][]int64) *MockPerfCounterScraper {
return &MockPerfCounterScraper{objectsAndValuesToReturn: objectsAndValuesToReturn}
}

// Initialize is a no-op
func (p *MockPerfCounterScraper) Initialize(objects ...string) error {
return nil
}

// Scrape returns a perf data collection with the supplied object / counter values,
// according to the supplied order.
func (p *MockPerfCounterScraper) Scrape() (PerfDataCollection, error) {
objectsAndValuesToReturn := make(map[string]map[string]int64, len(p.objectsAndValuesToReturn))
for objectName, countersToReturn := range p.objectsAndValuesToReturn {
valuesToReturn := make(map[string]int64, len(countersToReturn))
for counterName, orderedValuesToReturn := range countersToReturn {
returnIndex := p.timesCalled
if returnIndex >= len(orderedValuesToReturn) {
returnIndex = len(orderedValuesToReturn) - 1
}
valuesToReturn[counterName] = orderedValuesToReturn[returnIndex]
}
objectsAndValuesToReturn[objectName] = valuesToReturn
}

p.timesCalled++
return mockPerfDataCollection{objectsAndValuesToReturn: objectsAndValuesToReturn}, nil
}

type mockPerfDataCollection struct {
objectsAndValuesToReturn map[string]map[string]int64
}

// GetObject returns the specified object / counter values
func (p mockPerfDataCollection) GetObject(objectName string) (PerfDataObject, error) {
valuesToReturn, ok := p.objectsAndValuesToReturn[objectName]
if !ok {
return nil, fmt.Errorf("Unable to find object %q", objectName)
}

return mockPerfDataObject{valuesToReturn: valuesToReturn}, nil
}

type mockPerfDataObject struct {
valuesToReturn map[string]int64
}

// Filter is a no-op
func (obj mockPerfDataObject) Filter(includeFS, excludeFS filterset.FilterSet, includeTotal bool) {
}

// GetValues returns the specified counter values
func (obj mockPerfDataObject) GetValues(counterNames ...string) ([]*CounterValues, error) {
value := &CounterValues{Values: map[string]int64{}}
for _, counterName := range counterNames {
valueToReturn, ok := obj.valuesToReturn[counterName]
if !ok {
return nil, fmt.Errorf("Mock Perf Counter Scraper configured incorrectly. Return value for counter %q not specified", counterName)
}
value.Values[counterName] = valueToReturn
}
return []*CounterValues{value}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import (
"github.com/shirou/gopsutil/load"
"go.uber.org/zap"

"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters"
)

// Sample processor queue length at a 5s frequency, and calculate exponentially weighted moving averages
// as per https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation

const processorQueueLengthPath = `\System\Processor Queue Length`
const (
system = "System"
processorQueueLength = "Processor Queue Length"
)

var (
samplingFrequency = 5 * time.Second
Expand All @@ -49,13 +52,13 @@ var (
)

type sampler struct {
done chan struct{}
logger *zap.Logger
processorQueueLengthCounter pdh.PerfCounterScraper
loadAvg1m float64
loadAvg5m float64
loadAvg15m float64
lock sync.RWMutex
done chan struct{}
logger *zap.Logger
perfCounterScraper perfcounters.PerfCounterScraper
loadAvg1m float64
loadAvg5m float64
loadAvg15m float64
lock sync.RWMutex
}

func startSampling(_ context.Context, logger *zap.Logger) error {
Expand All @@ -80,15 +83,15 @@ func startSampling(_ context.Context, logger *zap.Logger) error {
}

func newSampler(logger *zap.Logger) (*sampler, error) {
processorQueueLengthCounter, err := pdh.NewPerfCounter(processorQueueLengthPath, false)
if err != nil {
perfCounterScraper := &perfcounters.PerfLibScraper{}
if err := perfCounterScraper.Initialize(system); err != nil {
return nil, err
}

sampler := &sampler{
logger: logger,
processorQueueLengthCounter: processorQueueLengthCounter,
done: make(chan struct{}),
logger: logger,
perfCounterScraper: perfCounterScraper,
done: make(chan struct{}),
}

return sampler, nil
Expand All @@ -111,13 +114,25 @@ func (sw *sampler) startSamplingTicker() {
}

func (sw *sampler) sampleLoad() {
counterValues, err := sw.processorQueueLengthCounter.ScrapeData()
counters, err := sw.perfCounterScraper.Scrape()
if err != nil {
sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
return
}

currentLoad := counterValues[0].Value
systemObject, err := counters.GetObject(system)
if err != nil {
sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
return
}

counterValues, err := systemObject.GetValues(processorQueueLength)
if err != nil {
sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
return
}

currentLoad := float64(counterValues[0].Values[processorQueueLength])

sw.lock.Lock()
defer sw.lock.Unlock()
Expand All @@ -137,7 +152,7 @@ func stopSampling(_ context.Context) error {
}

close(samplerInstance.done)
return samplerInstance.processorQueueLengthCounter.Close()
return nil
}

func getSampledLoadAverages() (*load.AvgStat, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters"
)

func TestStartSampling(t *testing.T) {
Expand All @@ -38,13 +38,15 @@ func TestStartSampling(t *testing.T) {

// override the processor queue length perf counter with a mock
// that will ensure a positive value is returned
assert.IsType(t, &pdh.PerfCounter{}, samplerInstance.processorQueueLengthCounter)
samplerInstance.processorQueueLengthCounter = pdh.NewMockPerfCounter(100.0)
assert.IsType(t, &perfcounters.PerfLibScraper{}, samplerInstance.perfCounterScraper)
samplerInstance.perfCounterScraper = perfcounters.NewMockPerfCounterScraper(map[string]map[string][]int64{
system: map[string][]int64{processorQueueLength: []int64{100}},
})

// second call to startSampling should succeed, but not do anything
startSampling(context.Background(), zap.NewNop())
assertSamplingUnderway(t)
assert.IsType(t, &pdh.MockPerfCounter{}, samplerInstance.processorQueueLengthCounter)
assert.IsType(t, &perfcounters.MockPerfCounterScraper{}, samplerInstance.perfCounterScraper)

// ensure that a positive load avg is returned by a call to
// "getSampledLoadAverages" which validates the value from the
Expand All @@ -68,7 +70,7 @@ func TestStartSampling(t *testing.T) {

func assertSamplingUnderway(t *testing.T) {
assert.NotNil(t, samplerInstance)
assert.NotNil(t, samplerInstance.processorQueueLengthCounter)
assert.NotNil(t, samplerInstance.perfCounterScraper)

select {
case <-samplerInstance.done:
Expand All @@ -78,10 +80,6 @@ func assertSamplingUnderway(t *testing.T) {
}

func assertSamplingStopped(t *testing.T) {
// validate perf counter was closed by trying to close again
err := samplerInstance.processorQueueLengthCounter.Close()
assert.EqualError(t, err, "attempted to call close more than once")

select {
case <-samplerInstance.done:
default:
Expand All @@ -90,9 +88,12 @@ func assertSamplingStopped(t *testing.T) {
}

func TestSampleLoad(t *testing.T) {
counterReturnValues := []interface{}{10.0, 20.0, 30.0, 40.0, 50.0}
mockCounter := pdh.NewMockPerfCounter(counterReturnValues...)
samplerInstance = &sampler{processorQueueLengthCounter: mockCounter}
counterReturnValues := []int64{10, 20, 30, 40, 50}
mockPerfCounterScraper := perfcounters.NewMockPerfCounterScraper(map[string]map[string][]int64{
system: map[string][]int64{processorQueueLength: counterReturnValues},
})

samplerInstance = &sampler{perfCounterScraper: mockPerfCounterScraper}

for i := 0; i < len(counterReturnValues); i++ {
samplerInstance.sampleLoad()
Expand All @@ -103,12 +104,12 @@ func TestSampleLoad(t *testing.T) {
assert.Equal(t, calcExpectedLoad(counterReturnValues, loadAvgFactor15m), samplerInstance.loadAvg15m)
}

func calcExpectedLoad(scrapedValues []interface{}, loadAvgFactor float64) float64 {
func calcExpectedLoad(scrapedValues []int64, loadAvgFactor float64) float64 {
// replicate the calculations that should be performed to determine the exponentially
// weighted moving averages based on the specified scraped values
var expectedLoad float64
for i := 0; i < len(scrapedValues); i++ {
expectedLoad = expectedLoad*loadAvgFactor + scrapedValues[i].(float64)*(1-loadAvgFactor)
expectedLoad = expectedLoad*loadAvgFactor + float64(scrapedValues[i])*(1-loadAvgFactor)
}
return expectedLoad
}
Expand Down

0 comments on commit 8f265cc

Please sign in to comment.