From 6f682e3645565242451048fb93f6e8f5e0d8880a Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Wed, 23 Sep 2020 15:27:12 +1000 Subject: [PATCH] Update hostmetrics disk scraper to use new perfcounters package --- .../diskscraper/disk_scraper_others_test.go | 51 +-- .../scraper/diskscraper/disk_scraper_test.go | 39 +- .../diskscraper/disk_scraper_windows.go | 367 +++--------------- .../diskscraper/disk_scraper_windows_test.go | 64 +-- 4 files changed, 119 insertions(+), 402 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go index 983982cd45a..91bdfdbc7e3 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go @@ -19,41 +19,25 @@ package diskscraper import ( "context" "errors" - "runtime" "testing" "github.com/shirou/gopsutil/disk" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/consumer/pdata" ) func TestScrapeMetrics_Others(t *testing.T) { type testCase struct { - name string - bootTimeFunc func() (uint64, error) - ioCountersFunc func(names ...string) (map[string]disk.IOCountersStat, error) - expectedStartTime pdata.TimestampUnixNano - initializationErr string - expectedErr string + name string + ioCountersFunc func(names ...string) (map[string]disk.IOCountersStat, error) + expectedErr string } testCases := []testCase{ - { - name: "Validate Start Time", - bootTimeFunc: func() (uint64, error) { return 100, nil }, - expectedStartTime: 100 * 1e9, - }, - { - name: "Boot Time Error", - bootTimeFunc: func() (uint64, error) { return 0, errors.New("err1") }, - initializationErr: "err1", - }, { name: "Error", - ioCountersFunc: func(names ...string) (map[string]disk.IOCountersStat, error) { return nil, errors.New("err2") }, - expectedErr: "err2", + ioCountersFunc: func(names ...string) (map[string]disk.IOCountersStat, error) { return nil, errors.New("err1") }, + expectedErr: "err1", }, } @@ -62,37 +46,16 @@ func TestScrapeMetrics_Others(t *testing.T) { scraper, err := newDiskScraper(context.Background(), &Config{}) require.NoError(t, err, "Failed to create disk scraper: %v", err) - if test.bootTimeFunc != nil { - scraper.bootTime = test.bootTimeFunc - } if test.ioCountersFunc != nil { scraper.ioCounters = test.ioCountersFunc } err = scraper.Initialize(context.Background()) - if test.initializationErr != "" { - assert.EqualError(t, err, test.initializationErr) - return - } require.NoError(t, err, "Failed to initialize disk scraper: %v", err) defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - return - } - - require.NoError(t, err, "Failed to scrape metrics: %v", err) - - assertInt64DiskMetricValid(t, metrics.At(0), diskIODescriptor, test.expectedStartTime) - assertInt64DiskMetricValid(t, metrics.At(1), diskOpsDescriptor, test.expectedStartTime) - assertDoubleDiskMetricValid(t, metrics.At(2), diskTimeDescriptor, test.expectedStartTime) - assertDiskPendingOperationsMetricValid(t, metrics.At(3)) - - if runtime.GOOS == "linux" { - assertInt64DiskMetricValid(t, metrics.At(4), diskMergedDescriptor, test.expectedStartTime) - } + _, err = scraper.ScrapeMetrics(context.Background()) + assert.EqualError(t, err, test.expectedErr) }) } } diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go index f6dbebdde93..defd185f299 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go @@ -16,6 +16,7 @@ package diskscraper import ( "context" + "errors" "runtime" "testing" @@ -29,10 +30,13 @@ import ( func TestScrapeMetrics(t *testing.T) { type testCase struct { - name string - config Config - expectMetrics bool - newErrRegex string + name string + config Config + bootTimeFunc func() (uint64, error) + newErrRegex string + initializationErr string + expectMetrics bool + expectedStartTime pdata.TimestampUnixNano } testCases := []testCase{ @@ -40,6 +44,17 @@ func TestScrapeMetrics(t *testing.T) { name: "Standard", expectMetrics: true, }, + { + name: "Validate Start Time", + bootTimeFunc: func() (uint64, error) { return 100, nil }, + expectMetrics: true, + expectedStartTime: 100 * 1e9, + }, + { + name: "Boot Time Error", + bootTimeFunc: func() (uint64, error) { return 0, errors.New("err1") }, + initializationErr: "err1", + }, { name: "Include Filter that matches nothing", config: Config{Include: MatchConfig{filterset.Config{MatchType: "strict"}, []string{"@*^#&*$^#)"}}}, @@ -67,7 +82,15 @@ func TestScrapeMetrics(t *testing.T) { } require.NoError(t, err, "Failed to create disk scraper: %v", err) + if test.bootTimeFunc != nil { + scraper.bootTime = test.bootTimeFunc + } + err = scraper.Initialize(context.Background()) + if test.initializationErr != "" { + assert.EqualError(t, err, test.initializationErr) + return + } require.NoError(t, err, "Failed to initialize disk scraper: %v", err) defer func() { assert.NoError(t, scraper.Close(context.Background())) }() @@ -81,13 +104,13 @@ func TestScrapeMetrics(t *testing.T) { assert.GreaterOrEqual(t, metrics.Len(), 4) - assertInt64DiskMetricValid(t, metrics.At(0), diskIODescriptor, 0) - assertInt64DiskMetricValid(t, metrics.At(1), diskOpsDescriptor, 0) - assertDoubleDiskMetricValid(t, metrics.At(2), diskTimeDescriptor, 0) + assertInt64DiskMetricValid(t, metrics.At(0), diskIODescriptor, test.expectedStartTime) + assertInt64DiskMetricValid(t, metrics.At(1), diskOpsDescriptor, test.expectedStartTime) + assertDoubleDiskMetricValid(t, metrics.At(2), diskTimeDescriptor, test.expectedStartTime) assertDiskPendingOperationsMetricValid(t, metrics.At(3)) if runtime.GOOS == "linux" { - assertInt64DiskMetricValid(t, metrics.At(4), diskMergedDescriptor, 0) + assertInt64DiskMetricValid(t, metrics.At(4), diskMergedDescriptor, test.expectedStartTime) } internal.AssertSameTimeStampForAllMetrics(t, metrics) diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go index 09a68de9124..af3021ebe95 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go @@ -17,76 +17,47 @@ package diskscraper import ( "context" "fmt" - "reflect" "time" - "go.opentelemetry.io/collector/component/componenterror" + "github.com/shirou/gopsutil/host" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/third_party/telegraf/win_perf_counters" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh" + "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) const ( - logicalDiskReadsPerSecPath = `\LogicalDisk(*)\Disk Reads/sec` - logicalDiskWritesPerSecPath = `\LogicalDisk(*)\Disk Writes/sec` + logicalDisk = "LogicalDisk" + + readsPerSec = "Disk Reads/sec" + writesPerSec = "Disk Writes/sec" - logicalDiskReadBytesPerSecPath = `\LogicalDisk(*)\Disk Read Bytes/sec` - logicalDiskWriteBytesPerSecPath = `\LogicalDisk(*)\Disk Write Bytes/sec` + readBytesPerSec = "Disk Read Bytes/sec" + writeBytesPerSec = "Disk Write Bytes/sec" - logicalAvgDiskSecsPerReadPath = `\LogicalDisk(*)\Avg. Disk sec/Read` - logicalAvgDiskSecsPerWritePath = `\LogicalDisk(*)\Avg. Disk sec/Write` + avgDiskSecsPerRead = "Avg. Disk sec/Read" + avgDiskSecsPerWrite = "Avg. Disk sec/Write" - logicalDiskQueueLengthPath = `\LogicalDisk(*)\Current Disk Queue Length` + queueLength = "Current Disk Queue Length" ) // scraper for Disk Metrics type scraper struct { - config *Config - startTime pdata.TimestampUnixNano - prevScrapeTime time.Time - includeFS filterset.FilterSet - excludeFS filterset.FilterSet - - diskReadBytesPerSecCounter pdh.PerfCounterScraper - diskWriteBytesPerSecCounter pdh.PerfCounterScraper - diskReadsPerSecCounter pdh.PerfCounterScraper - diskWritesPerSecCounter pdh.PerfCounterScraper - avgDiskSecsPerReadCounter pdh.PerfCounterScraper - avgDiskSecsPerWriteCounter pdh.PerfCounterScraper - diskQueueLengthCounter pdh.PerfCounterScraper - - cumulativeDiskIO cumulativeDiskValues - cumulativeDiskOps cumulativeDiskValues - cumulativeDiskTime cumulativeDiskValues -} + config *Config + startTime pdata.TimestampUnixNano + includeFS filterset.FilterSet + excludeFS filterset.FilterSet -type cumulativeDiskValues map[string]*value - -func (cv cumulativeDiskValues) getOrAdd(k string) *value { - if v, ok := cv[k]; ok { - return v - } + perfCounterScraper perfcounters.PerfCounterScraper - v := &value{} - cv[k] = v - return v -} - -type value struct { - read float64 - write float64 + // for mocking + bootTime func() (uint64, error) } // newDiskScraper creates a Disk Scraper func newDiskScraper(_ context.Context, cfg *Config) (*scraper, error) { - scraper := &scraper{ - config: cfg, - cumulativeDiskIO: map[string]*value{}, - cumulativeDiskOps: map[string]*value{}, - cumulativeDiskTime: map[string]*value{}, - } + scraper := &scraper{config: cfg, perfCounterScraper: &perfcounters.PerfLibScraper{}, bootTime: host.BootTime} var err error @@ -109,279 +80,96 @@ func newDiskScraper(_ context.Context, cfg *Config) (*scraper, error) { // Initialize func (s *scraper) Initialize(_ context.Context) error { - s.startTime = internal.TimeToUnixNano(time.Now()) - s.prevScrapeTime = time.Now() - - var err error - - s.diskReadBytesPerSecCounter, err = pdh.NewPerfCounter(logicalDiskReadBytesPerSecPath, true) + bootTime, err := s.bootTime() if err != nil { return err } - s.diskWriteBytesPerSecCounter, err = pdh.NewPerfCounter(logicalDiskWriteBytesPerSecPath, true) - if err != nil { - return err - } + s.startTime = pdata.TimestampUnixNano(bootTime * 1e9) - s.diskReadsPerSecCounter, err = pdh.NewPerfCounter(logicalDiskReadsPerSecPath, true) - if err != nil { - return err - } - - s.diskWritesPerSecCounter, err = pdh.NewPerfCounter(logicalDiskWritesPerSecPath, true) - if err != nil { - return err - } - - s.avgDiskSecsPerReadCounter, err = pdh.NewPerfCounter(logicalAvgDiskSecsPerReadPath, true) - if err != nil { - return err - } - - s.avgDiskSecsPerWriteCounter, err = pdh.NewPerfCounter(logicalAvgDiskSecsPerWritePath, true) - if err != nil { - return err - } - - s.diskQueueLengthCounter, err = pdh.NewPerfCounter(logicalDiskQueueLengthPath, true) - if err != nil { - return err - } - - return nil + return s.perfCounterScraper.Initialize(logicalDisk) } // Close func (s *scraper) Close(_ context.Context) error { - var errors []error - - if s.diskReadBytesPerSecCounter != nil && !reflect.ValueOf(s.diskReadBytesPerSecCounter).IsNil() { - if err := s.diskReadBytesPerSecCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.diskWriteBytesPerSecCounter != nil && !reflect.ValueOf(s.diskWriteBytesPerSecCounter).IsNil() { - if err := s.diskWriteBytesPerSecCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.diskReadsPerSecCounter != nil && !reflect.ValueOf(s.diskReadsPerSecCounter).IsNil() { - if err := s.diskReadsPerSecCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.diskWritesPerSecCounter != nil && !reflect.ValueOf(s.diskWritesPerSecCounter).IsNil() { - if err := s.diskWritesPerSecCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.avgDiskSecsPerReadCounter != nil && !reflect.ValueOf(s.avgDiskSecsPerReadCounter).IsNil() { - if err := s.avgDiskSecsPerReadCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.avgDiskSecsPerWriteCounter != nil && !reflect.ValueOf(s.avgDiskSecsPerWriteCounter).IsNil() { - if err := s.avgDiskSecsPerWriteCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - if s.diskQueueLengthCounter != nil && !reflect.ValueOf(s.diskQueueLengthCounter).IsNil() { - if err := s.diskQueueLengthCounter.Close(); err != nil { - errors = append(errors, err) - } - } - - return componenterror.CombineErrors(errors) + return nil } // ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { - now := time.Now() - durationSinceLastScraped := now.Sub(s.prevScrapeTime).Seconds() - s.prevScrapeTime = now - nowUnixTime := pdata.TimestampUnixNano(uint64(now.UnixNano())) - +func (s *scraper) ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() - var errors []error - - err := s.scrapeAndAppendDiskIOMetric(metrics, nowUnixTime, durationSinceLastScraped) - if err != nil { - errors = append(errors, err) - } + now := internal.TimeToUnixNano(time.Now()) - err = s.scrapeAndAppendDiskOpsMetric(metrics, nowUnixTime, durationSinceLastScraped) + counters, err := s.perfCounterScraper.Scrape() if err != nil { - errors = append(errors, err) + return metrics, err } - err = s.scrapeAndAppendDiskPendingOperationsMetric(metrics, nowUnixTime) + logicalDiskObject, err := counters.GetObject(logicalDisk) if err != nil { - errors = append(errors, err) + return metrics, err } - return metrics, componenterror.CombineErrors(errors) -} + // filter devices by name + logicalDiskObject.Filter(s.includeFS, s.excludeFS, false) -func (s *scraper) scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, now pdata.TimestampUnixNano, durationSinceLastScraped float64) error { - diskReadBytesPerSecValues, err := s.diskReadBytesPerSecCounter.ScrapeData() + logicalDiskCounterValues, err := logicalDiskObject.GetValues(readsPerSec, writesPerSec, readBytesPerSec, writeBytesPerSec, avgDiskSecsPerRead, avgDiskSecsPerWrite, queueLength) if err != nil { - return err + return metrics, err } - diskWriteBytesPerSecValues, err := s.diskWriteBytesPerSecCounter.ScrapeData() - if err != nil { - return err - } - - for _, diskReadBytesPerSec := range diskReadBytesPerSecValues { - if s.includeDevice(diskReadBytesPerSec.InstanceName) { - s.cumulativeDiskIO.getOrAdd(diskReadBytesPerSec.InstanceName).read += diskReadBytesPerSec.Value * durationSinceLastScraped - } - } - - for _, diskWriteBytesPerSec := range diskWriteBytesPerSecValues { - if s.includeDevice(diskWriteBytesPerSec.InstanceName) { - s.cumulativeDiskIO.getOrAdd(diskWriteBytesPerSec.InstanceName).write += diskWriteBytesPerSec.Value * durationSinceLastScraped - } - } - - if len(s.cumulativeDiskIO) == 0 { - return nil - } - - idx := metrics.Len() - metrics.Resize(idx + 1) - initializeDiskInt64Metric(metrics.At(idx), diskIODescriptor, s.startTime, now, s.cumulativeDiskIO) - return nil -} - -func (s *scraper) scrapeAndAppendDiskOpsMetric(metrics pdata.MetricSlice, now pdata.TimestampUnixNano, durationSinceLastScraped float64) error { - diskReadsPerSecValues, err := s.diskReadsPerSecCounter.ScrapeData() - if err != nil { - return err - } - - diskWritesPerSecValues, err := s.diskWritesPerSecCounter.ScrapeData() - if err != nil { - return err - } - - avgDiskSecsPerReadValues, err := s.avgDiskSecsPerReadCounter.ScrapeData() - if err != nil { - return err - } - avgDiskSecsPerReadMap := toMap(avgDiskSecsPerReadValues) - - avgDiskSecsPerWriteValues, err := s.avgDiskSecsPerWriteCounter.ScrapeData() - if err != nil { - return err - } - avgDiskSecsPerWriteMap := toMap(avgDiskSecsPerWriteValues) - - for _, diskReadsPerSec := range diskReadsPerSecValues { - device := diskReadsPerSec.InstanceName - if !s.includeDevice(device) { - continue - } - - deltaReadOperations := diskReadsPerSec.Value * durationSinceLastScraped - s.cumulativeDiskOps.getOrAdd(device).read += deltaReadOperations - if avgDiskSecsPerRead, ok := avgDiskSecsPerReadMap[device]; ok { - s.cumulativeDiskTime.getOrAdd(device).read += deltaReadOperations * avgDiskSecsPerRead - } - } - - for _, diskWritesPerSec := range diskWritesPerSecValues { - device := diskWritesPerSec.InstanceName - if !s.includeDevice(device) { - continue - } - - deltaWriteOperations := diskWritesPerSec.Value * durationSinceLastScraped - s.cumulativeDiskOps.getOrAdd(device).write += deltaWriteOperations - if avgDiskSecsPerWrite, ok := avgDiskSecsPerWriteMap[device]; ok { - s.cumulativeDiskTime.getOrAdd(device).write += deltaWriteOperations * avgDiskSecsPerWrite - } - } - - idx := metrics.Len() - - if len(s.cumulativeDiskIO) > 0 { - metrics.Resize(idx + 1) - initializeDiskInt64Metric(metrics.At(idx), diskOpsDescriptor, s.startTime, now, s.cumulativeDiskOps) - idx++ - } - - if len(s.cumulativeDiskTime) > 0 { - metrics.Resize(idx + 1) - initializeDiskDoubleMetric(metrics.At(idx), diskTimeDescriptor, s.startTime, now, s.cumulativeDiskTime) - idx++ + if len(logicalDiskCounterValues) > 0 { + metrics.Resize(4) + initializeDiskIOMetric(metrics.At(0), s.startTime, now, logicalDiskCounterValues) + initializeDiskOpsMetric(metrics.At(1), s.startTime, now, logicalDiskCounterValues) + initializeDiskTimeMetric(metrics.At(2), s.startTime, now, logicalDiskCounterValues) + initializeDiskPendingOperationsMetric(metrics.At(3), now, logicalDiskCounterValues) } - return nil + return metrics, nil } -func (s *scraper) scrapeAndAppendDiskPendingOperationsMetric(metrics pdata.MetricSlice, now pdata.TimestampUnixNano) error { - diskQueueLengthValues, err := s.diskQueueLengthCounter.ScrapeData() - if err != nil { - return err - } +func initializeDiskIOMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, logicalDiskCounterValues []*perfcounters.CounterValues) { + diskIODescriptor.CopyTo(metric) - filteredDiskQueueLengthValues := s.filterByDevice(diskQueueLengthValues) - if len(filteredDiskQueueLengthValues) == 0 { - return nil + idps := metric.IntSum().DataPoints() + idps.Resize(2 * len(logicalDiskCounterValues)) + for idx, logicalDiskCounter := range logicalDiskCounterValues { + initializeInt64DataPoint(idps.At(2*idx+0), startTime, now, logicalDiskCounter.InstanceName, readDirectionLabelValue, logicalDiskCounter.Values[readBytesPerSec]) + initializeInt64DataPoint(idps.At(2*idx+1), startTime, now, logicalDiskCounter.InstanceName, writeDirectionLabelValue, logicalDiskCounter.Values[writeBytesPerSec]) } - - idx := metrics.Len() - metrics.Resize(idx + 1) - initializeDiskPendingOperationsMetric(metrics.At(idx), now, filteredDiskQueueLengthValues) - return nil } -func initializeDiskInt64Metric(metric pdata.Metric, descriptor pdata.Metric, startTime, now pdata.TimestampUnixNano, ops cumulativeDiskValues) { - descriptor.CopyTo(metric) +func initializeDiskOpsMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, logicalDiskCounterValues []*perfcounters.CounterValues) { + diskOpsDescriptor.CopyTo(metric) idps := metric.IntSum().DataPoints() - idps.Resize(2 * len(ops)) - - idx := 0 - for device, value := range ops { - initializeInt64DataPoint(idps.At(idx+0), startTime, now, device, readDirectionLabelValue, int64(value.read)) - initializeInt64DataPoint(idps.At(idx+1), startTime, now, device, writeDirectionLabelValue, int64(value.write)) - idx += 2 + idps.Resize(2 * len(logicalDiskCounterValues)) + for idx, logicalDiskCounter := range logicalDiskCounterValues { + initializeInt64DataPoint(idps.At(2*idx+0), startTime, now, logicalDiskCounter.InstanceName, readDirectionLabelValue, logicalDiskCounter.Values[readsPerSec]) + initializeInt64DataPoint(idps.At(2*idx+1), startTime, now, logicalDiskCounter.InstanceName, writeDirectionLabelValue, logicalDiskCounter.Values[writesPerSec]) } } -func initializeDiskDoubleMetric(metric pdata.Metric, descriptor pdata.Metric, startTime, now pdata.TimestampUnixNano, ops cumulativeDiskValues) { - descriptor.CopyTo(metric) +func initializeDiskTimeMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, logicalDiskCounterValues []*perfcounters.CounterValues) { + diskTimeDescriptor.CopyTo(metric) ddps := metric.DoubleSum().DataPoints() - ddps.Resize(2 * len(ops)) - - idx := 0 - for device, value := range ops { - initializeDoubleDataPoint(ddps.At(idx+0), startTime, now, device, readDirectionLabelValue, value.read) - initializeDoubleDataPoint(ddps.At(idx+1), startTime, now, device, writeDirectionLabelValue, value.write) - idx += 2 + ddps.Resize(2 * len(logicalDiskCounterValues)) + for idx, logicalDiskCounter := range logicalDiskCounterValues { + initializeDoubleDataPoint(ddps.At(2*idx+0), startTime, now, logicalDiskCounter.InstanceName, readDirectionLabelValue, float64(logicalDiskCounter.Values[avgDiskSecsPerRead])/1e7) + initializeDoubleDataPoint(ddps.At(2*idx+1), startTime, now, logicalDiskCounter.InstanceName, writeDirectionLabelValue, float64(logicalDiskCounter.Values[avgDiskSecsPerWrite])/1e7) } } -func initializeDiskPendingOperationsMetric(metric pdata.Metric, now pdata.TimestampUnixNano, avgDiskQueueLengthValues []win_perf_counters.CounterValue) { +func initializeDiskPendingOperationsMetric(metric pdata.Metric, now pdata.TimestampUnixNano, logicalDiskCounterValues []*perfcounters.CounterValues) { diskPendingOperationsDescriptor.CopyTo(metric) idps := metric.IntSum().DataPoints() - idps.Resize(len(avgDiskQueueLengthValues)) - - for idx, avgDiskQueueLengthValue := range avgDiskQueueLengthValues { - initializeDiskPendingDataPoint(idps.At(idx), now, avgDiskQueueLengthValue.InstanceName, int64(avgDiskQueueLengthValue.Value)) + idps.Resize(len(logicalDiskCounterValues)) + for idx, logicalDiskCounter := range logicalDiskCounterValues { + initializeDiskPendingDataPoint(idps.At(idx), now, logicalDiskCounter.InstanceName, logicalDiskCounter.Values[queueLength]) } } @@ -409,30 +197,3 @@ func initializeDiskPendingDataPoint(dataPoint pdata.IntDataPoint, now pdata.Time dataPoint.SetTimestamp(now) dataPoint.SetValue(value) } - -func (s *scraper) filterByDevice(values []win_perf_counters.CounterValue) []win_perf_counters.CounterValue { - if s.includeFS == nil && s.excludeFS == nil { - return values - } - - filteredValues := make([]win_perf_counters.CounterValue, 0, len(values)) - for _, value := range values { - if s.includeDevice(value.InstanceName) { - filteredValues = append(filteredValues, value) - } - } - return filteredValues -} - -func (s *scraper) includeDevice(deviceName string) bool { - return (s.includeFS == nil || s.includeFS.Matches(deviceName)) && - (s.excludeFS == nil || !s.excludeFS.Matches(deviceName)) -} - -func toMap(values []win_perf_counters.CounterValue) map[string]float64 { - mp := make(map[string]float64, len(values)) - for _, value := range values { - mp[value.InstanceName] = value.Value - } - return mp -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go index 250e34ad98c..3e41e3147f3 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go @@ -24,57 +24,33 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/windows/pdh" + "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) func TestScrapeMetrics_Error(t *testing.T) { type testCase struct { - name string - diskReadBytesPerSecCounterReturnValue interface{} - diskWriteBytesPerSecCounterReturnValue interface{} - diskReadsPerSecCounterReturnValue interface{} - diskWritesPerSecCounterReturnValue interface{} - avgDiskSecsPerReadCounterReturnValue interface{} - avgDiskSecsPerWriteCounterReturnValue interface{} - diskQueueLengthCounterReturnValue interface{} - expectedErr string + name string + scrapeErr error + getObjectErr error + getValuesErr error + expectedErr string } testCases := []testCase{ { - name: "readBytesPerSecCounterError", - diskReadBytesPerSecCounterReturnValue: errors.New("err1"), - expectedErr: "err1", + name: "scrapeError", + scrapeErr: errors.New("err1"), + expectedErr: "err1", }, { - name: "writeBytesPerSecCounterError", - diskWriteBytesPerSecCounterReturnValue: errors.New("err1"), - expectedErr: "err1", + name: "getObjectErr", + getObjectErr: errors.New("err1"), + expectedErr: "err1", }, { - name: "readsPerSecCounterError", - diskReadsPerSecCounterReturnValue: errors.New("err1"), - expectedErr: "err1", - }, - { - name: "writesPerSecCounterError", - diskWritesPerSecCounterReturnValue: errors.New("err1"), - expectedErr: "err1", - }, - { - name: "avgSecsPerReadCounterError", - avgDiskSecsPerReadCounterReturnValue: errors.New("err1"), - expectedErr: "err1", - }, - { - name: "avgSecsPerReadWriteError", - avgDiskSecsPerWriteCounterReturnValue: errors.New("err1"), - expectedErr: "err1", - }, - { - name: "avgDiskQueueLengthError", - diskQueueLengthCounterReturnValue: errors.New("err1"), - expectedErr: "err1", + name: "getValuesErr", + getValuesErr: errors.New("err1"), + expectedErr: "err1", }, } @@ -83,18 +59,12 @@ func TestScrapeMetrics_Error(t *testing.T) { scraper, err := newDiskScraper(context.Background(), &Config{}) require.NoError(t, err, "Failed to create disk scraper: %v", err) + scraper.perfCounterScraper = perfcounters.NewMockPerfCounterScraperError(test.scrapeErr, test.getObjectErr, test.getValuesErr) + err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize disk scraper: %v", err) defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - scraper.diskReadBytesPerSecCounter = pdh.NewMockPerfCounter(test.diskReadBytesPerSecCounterReturnValue) - scraper.diskWriteBytesPerSecCounter = pdh.NewMockPerfCounter(test.diskWriteBytesPerSecCounterReturnValue) - scraper.diskReadsPerSecCounter = pdh.NewMockPerfCounter(test.diskReadsPerSecCounterReturnValue) - scraper.diskWritesPerSecCounter = pdh.NewMockPerfCounter(test.diskWritesPerSecCounterReturnValue) - scraper.avgDiskSecsPerReadCounter = pdh.NewMockPerfCounter(test.avgDiskSecsPerReadCounterReturnValue) - scraper.avgDiskSecsPerWriteCounter = pdh.NewMockPerfCounter(test.avgDiskSecsPerWriteCounterReturnValue) - scraper.diskQueueLengthCounter = pdh.NewMockPerfCounter(test.diskQueueLengthCounterReturnValue) - _, err = scraper.ScrapeMetrics(context.Background()) assert.EqualError(t, err, test.expectedErr) })