From a9a2c663b080e75a9c105a83465cba7fbcab4ed5 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Sat, 26 Sep 2020 15:32:00 +1000 Subject: [PATCH] Update hostmetrics load scraper to use new perfcounters package --- .../perfcounters/perfcounter_scraper_mock.go | 89 +++++++++++++++++++ .../loadscraper/load_scraper_windows.go | 49 ++++++---- .../loadscraper/load_scraper_windows_test.go | 29 +++--- 3 files changed, 136 insertions(+), 31 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/perfcounters/perfcounter_scraper_mock.go b/receiver/hostmetricsreceiver/internal/perfcounters/perfcounter_scraper_mock.go index 4b47acf8706..eafa377bc9b 100644 --- a/receiver/hostmetricsreceiver/internal/perfcounters/perfcounter_scraper_mock.go +++ b/receiver/hostmetricsreceiver/internal/perfcounters/perfcounter_scraper_mock.go @@ -17,6 +17,8 @@ package perfcounters import ( + "fmt" + "go.opentelemetry.io/collector/internal/processor/filterset" ) @@ -76,3 +78,90 @@ func (obj mockPerfDataObjectError) Filter(includeFS, excludeFS filterset.FilterS func (obj mockPerfDataObjectError) GetValues(counterNames ...string) ([]*CounterValues, error) { return nil, obj.getValuesErr } + +// MockPerfCounterScraper is an implementation of PerfCounterScraper that returns the supplied +// object / counter values on each successive call to Scrape, in the specified order. +// +// Example Usage: +// +// s := NewMockPerfCounterScraper(map[string]map[string][]int64{ +// "Object1": map[string][]int64{ +// "Counter1": []int64{1, 2}, +// "Counter2": []int64{4}, +// }, +// }) +// +// s.Scrape().GetObject("Object1").GetValues("Counter1", "Counter2") +// +// ... 1st call returns []*CounterValues{ { Values: { "Counter1": 1, "Counter2": 4 } } } +// ... 2nd call returns []*CounterValues{ { Values: { "Counter1": 2, "Counter2": 4 } } } +type MockPerfCounterScraper struct { + objectsAndValuesToReturn map[string]map[string][]int64 + timesCalled int +} + +// NewMockPerfCounterScraper returns a MockPerfCounterScraper that will return the supplied +// object / counter values on each successive call to Scrape, in the specified order. +func NewMockPerfCounterScraper(objectsAndValuesToReturn map[string]map[string][]int64) *MockPerfCounterScraper { + return &MockPerfCounterScraper{objectsAndValuesToReturn: objectsAndValuesToReturn} +} + +// Initialize is a no-op +func (p *MockPerfCounterScraper) Initialize(objects ...string) error { + return nil +} + +// Scrape returns a perf data collection with the supplied object / counter values, +// according to the supplied order. +func (p *MockPerfCounterScraper) Scrape() (PerfDataCollection, error) { + objectsAndValuesToReturn := make(map[string]map[string]int64, len(p.objectsAndValuesToReturn)) + for objectName, countersToReturn := range p.objectsAndValuesToReturn { + valuesToReturn := make(map[string]int64, len(countersToReturn)) + for counterName, orderedValuesToReturn := range countersToReturn { + returnIndex := p.timesCalled + if returnIndex >= len(orderedValuesToReturn) { + returnIndex = len(orderedValuesToReturn) - 1 + } + valuesToReturn[counterName] = orderedValuesToReturn[returnIndex] + } + objectsAndValuesToReturn[objectName] = valuesToReturn + } + + p.timesCalled++ + return mockPerfDataCollection{objectsAndValuesToReturn: objectsAndValuesToReturn}, nil +} + +type mockPerfDataCollection struct { + objectsAndValuesToReturn map[string]map[string]int64 +} + +// GetObject returns the specified object / counter values +func (p mockPerfDataCollection) GetObject(objectName string) (PerfDataObject, error) { + valuesToReturn, ok := p.objectsAndValuesToReturn[objectName] + if !ok { + return nil, fmt.Errorf("Unable to find object %q", objectName) + } + + return mockPerfDataObject{valuesToReturn: valuesToReturn}, nil +} + +type mockPerfDataObject struct { + valuesToReturn map[string]int64 +} + +// Filter is a no-op +func (obj mockPerfDataObject) Filter(includeFS, excludeFS filterset.FilterSet, includeTotal bool) { +} + +// GetValues returns the specified counter values +func (obj mockPerfDataObject) GetValues(counterNames ...string) ([]*CounterValues, error) { + value := &CounterValues{Values: map[string]int64{}} + for _, counterName := range counterNames { + valueToReturn, ok := obj.valuesToReturn[counterName] + if !ok { + return nil, fmt.Errorf("Mock Perf Counter Scraper configured incorrectly. Return value for counter %q not specified", counterName) + } + value.Values[counterName] = valueToReturn + } + return []*CounterValues{value}, nil +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go index 1da6e149e80..2dddfba7fc6 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go @@ -25,13 +25,16 @@ import ( "github.com/shirou/gopsutil/load" "go.uber.org/zap" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh" + "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) // Sample processor queue length at a 5s frequency, and calculate exponentially weighted moving averages // as per https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation -const processorQueueLengthPath = `\System\Processor Queue Length` +const ( + system = "System" + processorQueueLength = "Processor Queue Length" +) var ( samplingFrequency = 5 * time.Second @@ -49,13 +52,13 @@ var ( ) type sampler struct { - done chan struct{} - logger *zap.Logger - processorQueueLengthCounter pdh.PerfCounterScraper - loadAvg1m float64 - loadAvg5m float64 - loadAvg15m float64 - lock sync.RWMutex + done chan struct{} + logger *zap.Logger + perfCounterScraper perfcounters.PerfCounterScraper + loadAvg1m float64 + loadAvg5m float64 + loadAvg15m float64 + lock sync.RWMutex } func startSampling(_ context.Context, logger *zap.Logger) error { @@ -80,15 +83,15 @@ func startSampling(_ context.Context, logger *zap.Logger) error { } func newSampler(logger *zap.Logger) (*sampler, error) { - processorQueueLengthCounter, err := pdh.NewPerfCounter(processorQueueLengthPath, false) - if err != nil { + perfCounterScraper := &perfcounters.PerfLibScraper{} + if err := perfCounterScraper.Initialize(system); err != nil { return nil, err } sampler := &sampler{ - logger: logger, - processorQueueLengthCounter: processorQueueLengthCounter, - done: make(chan struct{}), + logger: logger, + perfCounterScraper: perfCounterScraper, + done: make(chan struct{}), } return sampler, nil @@ -111,13 +114,25 @@ func (sw *sampler) startSamplingTicker() { } func (sw *sampler) sampleLoad() { - counterValues, err := sw.processorQueueLengthCounter.ScrapeData() + counters, err := sw.perfCounterScraper.Scrape() if err != nil { sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) return } - currentLoad := counterValues[0].Value + systemObject, err := counters.GetObject(system) + if err != nil { + sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) + return + } + + counterValues, err := systemObject.GetValues(processorQueueLength) + if err != nil { + sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) + return + } + + currentLoad := float64(counterValues[0].Values[processorQueueLength]) sw.lock.Lock() defer sw.lock.Unlock() @@ -137,7 +152,7 @@ func stopSampling(_ context.Context) error { } close(samplerInstance.done) - return samplerInstance.processorQueueLengthCounter.Close() + return nil } func getSampledLoadAverages() (*load.AvgStat, error) { diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows_test.go index ea9d4269212..01ea502635a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh" + "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) func TestStartSampling(t *testing.T) { @@ -38,13 +38,15 @@ func TestStartSampling(t *testing.T) { // override the processor queue length perf counter with a mock // that will ensure a positive value is returned - assert.IsType(t, &pdh.PerfCounter{}, samplerInstance.processorQueueLengthCounter) - samplerInstance.processorQueueLengthCounter = pdh.NewMockPerfCounter(100.0) + assert.IsType(t, &perfcounters.PerfLibScraper{}, samplerInstance.perfCounterScraper) + samplerInstance.perfCounterScraper = perfcounters.NewMockPerfCounterScraper(map[string]map[string][]int64{ + system: map[string][]int64{processorQueueLength: []int64{100}}, + }) // second call to startSampling should succeed, but not do anything startSampling(context.Background(), zap.NewNop()) assertSamplingUnderway(t) - assert.IsType(t, &pdh.MockPerfCounter{}, samplerInstance.processorQueueLengthCounter) + assert.IsType(t, &perfcounters.MockPerfCounterScraper{}, samplerInstance.perfCounterScraper) // ensure that a positive load avg is returned by a call to // "getSampledLoadAverages" which validates the value from the @@ -68,7 +70,7 @@ func TestStartSampling(t *testing.T) { func assertSamplingUnderway(t *testing.T) { assert.NotNil(t, samplerInstance) - assert.NotNil(t, samplerInstance.processorQueueLengthCounter) + assert.NotNil(t, samplerInstance.perfCounterScraper) select { case <-samplerInstance.done: @@ -78,10 +80,6 @@ func assertSamplingUnderway(t *testing.T) { } func assertSamplingStopped(t *testing.T) { - // validate perf counter was closed by trying to close again - err := samplerInstance.processorQueueLengthCounter.Close() - assert.EqualError(t, err, "attempted to call close more than once") - select { case <-samplerInstance.done: default: @@ -90,9 +88,12 @@ func assertSamplingStopped(t *testing.T) { } func TestSampleLoad(t *testing.T) { - counterReturnValues := []interface{}{10.0, 20.0, 30.0, 40.0, 50.0} - mockCounter := pdh.NewMockPerfCounter(counterReturnValues...) - samplerInstance = &sampler{processorQueueLengthCounter: mockCounter} + counterReturnValues := []int64{10, 20, 30, 40, 50} + mockPerfCounterScraper := perfcounters.NewMockPerfCounterScraper(map[string]map[string][]int64{ + system: map[string][]int64{processorQueueLength: counterReturnValues}, + }) + + samplerInstance = &sampler{perfCounterScraper: mockPerfCounterScraper} for i := 0; i < len(counterReturnValues); i++ { samplerInstance.sampleLoad() @@ -103,12 +104,12 @@ func TestSampleLoad(t *testing.T) { assert.Equal(t, calcExpectedLoad(counterReturnValues, loadAvgFactor15m), samplerInstance.loadAvg15m) } -func calcExpectedLoad(scrapedValues []interface{}, loadAvgFactor float64) float64 { +func calcExpectedLoad(scrapedValues []int64, loadAvgFactor float64) float64 { // replicate the calculations that should be performed to determine the exponentially // weighted moving averages based on the specified scraped values var expectedLoad float64 for i := 0; i < len(scrapedValues); i++ { - expectedLoad = expectedLoad*loadAvgFactor + scrapedValues[i].(float64)*(1-loadAvgFactor) + expectedLoad = expectedLoad*loadAvgFactor + float64(scrapedValues[i])*(1-loadAvgFactor) } return expectedLoad }