Skip to content

Commit

Permalink
Initial commit of host metrics cpu scraper using gopsutil to collect …
Browse files Browse the repository at this point in the history
…cpu times metric
  • Loading branch information
james-bebbington committed Apr 26, 2020
1 parent 7082daa commit 9dffd72
Show file tree
Hide file tree
Showing 18 changed files with 665 additions and 35 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/hashicorp/consul/api v1.2.0 // indirect
github.com/jaegertracing/jaeger v1.17.0
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024
github.com/mitchellh/mapstructure v1.2.2
github.com/mitchellh/mapstructure v1.2.2 // indirect
github.com/open-telemetry/opentelemetry-proto v0.3.0
github.com/openzipkin/zipkin-go v0.2.1
github.com/orijtech/prometheus-go-metrics-exporter v0.0.4
Expand All @@ -48,7 +48,7 @@ require (
github.com/spf13/cobra v0.0.6
github.com/spf13/viper v1.6.2
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/tcnksm/ghr v0.13.0
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/subosito/gotenv v1.1.1/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand Down
2 changes: 1 addition & 1 deletion receiver/hostmetricsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ hostmetrics:
default_collection_interval: 10s
scrapers:
cpu:
report_per_process: true
report_per_cpu: true
memory:
disk:
```
Expand Down
41 changes: 39 additions & 2 deletions receiver/hostmetricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ package hostmetricsreceiver
import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -29,7 +34,39 @@ func TestLoadConfig(t *testing.T) {

factory := NewFactory()
factories.Receivers[typeStr] = factory
_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

require.Error(t, err)
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 2)

r0 := cfg.Receivers["hostmetrics"]
defaultConfigAllScrapers := factory.CreateDefaultConfig()
defaultConfigAllScrapers.(*Config).Scrapers = map[string]internal.Config{
cpuscraper.TypeStr: getDefaultConfigWithDefaultCollectionInterval(&cpuscraper.Factory{}),
}
assert.Equal(t, r0, defaultConfigAllScrapers)

r1 := cfg.Receivers["hostmetrics/customname"].(*Config)
assert.Equal(t, r1,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "hostmetrics/customname",
},
DefaultCollectionInterval: 10 * time.Second,
Scrapers: map[string]internal.Config{
cpuscraper.TypeStr: &cpuscraper.Config{
ConfigSettings: internal.ConfigSettings{CollectionIntervalValue: 5 * time.Second},
ReportPerCPU: true,
},
},
})
}

func getDefaultConfigWithDefaultCollectionInterval(factory internal.Factory) internal.Config {
cfg := factory.CreateDefaultConfig()
cfg.SetCollectionInterval(10 * time.Second)
return cfg
}
4 changes: 3 additions & 1 deletion receiver/hostmetricsreceiver/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ extensions:

receivers:
hostmetrics:
default_collection_interval: 10s
default_collection_interval: 60s
scrapers:
cpu:
report_per_cpu: true

exporters:
logging:
Expand Down
29 changes: 20 additions & 9 deletions receiver/hostmetricsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper"
)

// This file implements Factory for HostMetrics receiver.
Expand All @@ -40,17 +41,19 @@ const (

// Factory is the Factory for receiver.
type Factory struct {
ScraperFactories map[string]internal.Factory
scraperFactories map[string]internal.Factory
}

// NewFactory creates a new factory
// NewFactory creates a new factory for host metrics receiver.
func NewFactory() *Factory {
return &Factory{
ScraperFactories: map[string]internal.Factory{},
scraperFactories: map[string]internal.Factory{
cpuscraper.TypeStr: &cpuscraper.Factory{},
},
}
}

// Type gets the type of the Receiver config created by this Factory.
// Type returns the type of the Receiver config created by this Factory.
func (f *Factory) Type() configmodels.Type {
return typeStr
}
Expand All @@ -71,6 +74,10 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return fmt.Errorf("config type not hostmetrics.Config")
}

if cfg.DefaultCollectionInterval <= 0 {
return fmt.Errorf("default_collection_interval must be a positive number")
}

// dynamically load the individual collector configs based on the key name

cfg.Scrapers = map[string]internal.Config{}
Expand All @@ -81,20 +88,24 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
}

for key := range componentViperSection.GetStringMap(scrapersKey) {
factory, ok := f.ScraperFactories[key]
factory, ok := f.scraperFactories[key]
if !ok {
return fmt.Errorf("invalid hostmetrics scraper key: %s", key)
return fmt.Errorf("invalid scraper key: %s", key)
}

collectorCfg := factory.CreateDefaultConfig()
collectorViperSection := scrapersViperSection.Sub(key)
if collectorViperSection != nil {
err := collectorViperSection.UnmarshalExact(collectorCfg)
if err != nil {
return fmt.Errorf("error reading settings for hostmetric scraper type %q: %v", key, err)
return fmt.Errorf("error reading settings for scraper type %q: %v", key, err)
}
}

if collectorCfg.CollectionInterval() <= 0 {
collectorCfg.SetCollectionInterval(cfg.DefaultCollectionInterval)
}

cfg.Scrapers[key] = collectorCfg
}

Expand All @@ -113,7 +124,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
// CreateTraceReceiver returns error as trace receiver is not applicable to host metrics receiver.
func (f *Factory) CreateTraceReceiver(
ctx context.Context,
params component.ReceiverCreateParams,
Expand All @@ -138,7 +149,7 @@ func (f *Factory) CreateMetricsReceiver(

config := cfg.(*Config)

hmr, err := NewHostMetricsReceiver(ctx, params.Logger, config, f.ScraperFactories, consumer)
hmr, err := NewHostMetricsReceiver(ctx, params.Logger, config, f.scraperFactories, consumer)
if err != nil {
return nil, err
}
Expand Down
13 changes: 3 additions & 10 deletions receiver/hostmetricsreceiver/hostmetrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
)

// Receiver is the type used to handle metrics from VM metrics.
// Receiver is the type that scrapes various host metrics.
type Receiver struct {
consumer consumer.MetricsConsumer
config *Config
scrapers []internal.Scraper
cancel context.CancelFunc
}

// NewHostMetricsReceiver creates a new set of VM and Process Metrics
// NewHostMetricsReceiver creates a host metrics scraper.
func NewHostMetricsReceiver(
ctx context.Context,
logger *zap.Logger,
Expand All @@ -45,15 +43,14 @@ func NewHostMetricsReceiver(

scrapers := make([]internal.Scraper, 0)
for key, cfg := range config.Scrapers {
scraper, err := factories[key].CreateMetricsScraper(ctx, logger, cfg)
scraper, err := factories[key].CreateMetricsScraper(ctx, logger, cfg, consumer)
if err != nil {
return nil, fmt.Errorf("cannot create scraper: %s", err.Error())
}
scrapers = append(scrapers, scraper)
}

hmr := &Receiver{
consumer: consumer,
config: config,
scrapers: scrapers,
}
Expand All @@ -63,8 +60,6 @@ func NewHostMetricsReceiver(

// Start begins scraping host metrics based on the OS platform.
func (hmr *Receiver) Start(ctx context.Context, host component.Host) error {
ctx, hmr.cancel = context.WithCancel(ctx)

go func() {
for _, scraper := range hmr.scrapers {
err := scraper.Start(ctx)
Expand All @@ -80,8 +75,6 @@ func (hmr *Receiver) Start(ctx context.Context, host component.Host) error {

// Shutdown stops the underlying host metrics scrapers.
func (hmr *Receiver) Shutdown(ctx context.Context) error {
hmr.cancel()

var errs []error

for _, scraper := range hmr.scrapers {
Expand Down
50 changes: 44 additions & 6 deletions receiver/hostmetricsreceiver/hostmetrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,74 @@ package hostmetricsreceiver

import (
"context"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component/componenttest"
"github.com/open-telemetry/opentelemetry-collector/consumer/pdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper"
)

func TestGatherMetrics_EndToEnd(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}

config := &Config{
DefaultCollectionInterval: 0,
Scrapers: map[string]internal.Config{},
Scrapers: map[string]internal.Config{
cpuscraper.TypeStr: &cpuscraper.Config{
ConfigSettings: internal.ConfigSettings{CollectionIntervalValue: 100 * time.Millisecond},
ReportPerCPU: true,
},
},
}

factories := map[string]internal.Factory{}
factories := map[string]internal.Factory{
cpuscraper.TypeStr: &cpuscraper.Factory{},
}

receiver, err := NewHostMetricsReceiver(context.Background(), zap.NewNop(), config, factories, sink)

if runtime.GOOS != "windows" {
require.Error(t, err, "Expected error when creating a host metrics receiver with cpuscraper collector on a non-windows environment")
return
}

require.NoError(t, err, "Failed to create metrics receiver: %v", err)

err = receiver.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Failed to start metrics receiver: %v", err)
defer func() { assert.NoError(t, receiver.Shutdown(context.Background())) }()

got := sink.AllMetrics()
require.Eventually(t, func() bool {
got := sink.AllMetrics()
if len(got) == 0 {
return false
}

assertMetricData(t, got)
return true
}, time.Second, 10*time.Millisecond, "No metrics were collected")
}

func assertMetricData(t *testing.T, got []pdata.Metrics) {
metrics := internal.AssertSingleMetricDataAndGetMetricsSlice(t, got)

// expect 1 metric
assert.Equal(t, 1, metrics.Len())

// expect 0 MetricData objects
assert.Equal(t, 0, len(got))
// for cpu seconds metric, expect 5 timeseries with appropriate labels
hostCPUTimeMetric := metrics.At(0)
internal.AssertDescriptorEqual(t, cpuscraper.MetricCPUSecondsDescriptor, hostCPUTimeMetric.MetricDescriptor())
assert.Equal(t, 4*runtime.NumCPU(), hostCPUTimeMetric.Int64DataPoints().Len())
internal.AssertInt64MetricLabelExists(t, hostCPUTimeMetric, 0, cpuscraper.CPULabel)
internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 0, cpuscraper.StateLabel, cpuscraper.UserStateLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 1, cpuscraper.StateLabel, cpuscraper.SystemStateLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 2, cpuscraper.StateLabel, cpuscraper.IdleStateLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostCPUTimeMetric, 3, cpuscraper.StateLabel, cpuscraper.InterruptStateLabelValue)
}
39 changes: 39 additions & 0 deletions receiver/hostmetricsreceiver/internal/metricutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"github.com/open-telemetry/opentelemetry-collector/consumer/pdata"
"github.com/open-telemetry/opentelemetry-collector/internal/data"
)

// Initializes a metric with a metric slice and returns it.
func InitializeMetricSlice(metricData data.MetricData) pdata.MetricSlice {
rms := metricData.ResourceMetrics()
rms.Resize(1)
rm := rms.At(0)
ilms := rm.InstrumentationLibraryMetrics()
ilms.Resize(1)
ilm := ilms.At(0)
return ilm.Metrics()
}

// AddNewMetric appends an empty metric to the metric slice, resizing
// the slice by 1, and returns the new metric.
func AddNewMetric(metrics pdata.MetricSlice) pdata.Metric {
len := metrics.Len()
metrics.Resize(len + 1)
return metrics.At(len)
}
Loading

0 comments on commit 9dffd72

Please sign in to comment.