diff --git a/receiver/awscontainerinsightreceiver/factory.go b/receiver/awscontainerinsightreceiver/factory.go index ff7d01aee2b9..a0fc586ad2bb 100644 --- a/receiver/awscontainerinsightreceiver/factory.go +++ b/receiver/awscontainerinsightreceiver/factory.go @@ -22,6 +22,11 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" + hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" ) // Factory for awscontainerinsightreceiver @@ -62,5 +67,13 @@ func createMetricsReceiver( ) (component.MetricsReceiver, error) { rCfg := baseCfg.(*Config) - return New(params.Logger, rCfg, consumer) + logger := params.Logger + hostInfo, err := hostInfo.NewInfo(rCfg.CollectionInterval, logger) + // TODO: I will need to change the code here to let cadvisor and k8sapiserver return err as well + if err != nil { + logger.Warn("failed to initialize hostInfo", zap.Error(err)) + } + cadvisor := cadvisor.New(rCfg.ContainerOrchestrator, hostInfo, logger) + k8sapiserver := k8sapiserver.New(hostInfo, logger) + return New(logger, rCfg, consumer, cadvisor, k8sapiserver) } diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index c24f93589002..596d48f0e2fb 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -3,7 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscon go 1.16 require ( + github.com/aws/aws-sdk-go v1.38.51 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil v0.0.0-00010101000000-000000000000 + github.com/shirou/gopsutil v3.21.4+incompatible github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.27.1-0.20210527142130-1f972bbd7997 go.uber.org/zap v1.17.0 ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil => ./../../internal/aws/awsutil diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index aa89da20861f..e93938b7d196 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -133,6 +133,8 @@ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= github.com/aws/aws-sdk-go v1.38.3 h1:QCL/le04oAz2jELMRSuJVjGT7H+4hhoQc66eMPCfU/k= github.com/aws/aws-sdk-go v1.38.3/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.38.51 h1:aKQmbVbwOCuQSd8+fm/MR3bq0QOsu9Q7S+/QEND36oQ= +github.com/aws/aws-sdk-go v1.38.51/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -188,6 +190,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.11 h1:07n33Z8lZxZ2qwegKbObQohDhXDQxiMMz1NOUGYlesw= github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b/go.mod h1:v9FBN7gdVTpiD/+LZ7Po0UKvROyT87uLVxTHVky/dlQ= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= @@ -504,7 +507,9 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -578,6 +583,7 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hetznercloud/hcloud-go v1.24.0 h1:/CeHDzhH3Fhm83pjxvE3xNNLbvACl0Lu1/auJ83gG5U= github.com/hetznercloud/hcloud-go v1.24.0/go.mod h1:3YmyK8yaZZ48syie6xpm3dt26rtB6s65AisBHylXYFA= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -768,11 +774,13 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -960,6 +968,7 @@ github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3 github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v0.0.0-20161117074351-18a02ba4a312/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1116,6 +1125,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1509,6 +1519,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify/fsnotify.v1 v1.4.7 h1:XNNYLJHt73EyYiCZi6+xjupS9CpvmiDgjPTAjrBlQbo= gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= @@ -1526,6 +1537,7 @@ gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLv gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go index fed839b0fa21..2c05207b8348 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux.go @@ -28,7 +28,7 @@ type Cadvisor struct { } // New creates a Cadvisor struct which can generate metrics from embedded cadvisor lib -func New(containerOrchestrator string, machineInfo *host.MachineInfo, logger *zap.Logger) *Cadvisor { +func New(containerOrchestrator string, machineInfo *host.Info, logger *zap.Logger) *Cadvisor { // TODO: initialize the cadvisor return &Cadvisor{} } diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go index 8a333c28e3b9..cde6448adea1 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_linux_test.go @@ -27,7 +27,7 @@ import ( ) func TestGetMetrics(t *testing.T) { - machineInfo := host.NewMachineInfo(time.Minute, zap.NewNop()) + machineInfo, _ := host.NewInfo(time.Minute, zap.NewNop()) c := New("eks", machineInfo, zap.NewNop()) assert.NotNil(t, c) assert.NotNil(t, c.GetMetrics()) diff --git a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go index 9567bd9b89d0..8930338d59fe 100644 --- a/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go +++ b/receiver/awscontainerinsightreceiver/internal/cadvisor/cadvisor_nolinux.go @@ -30,7 +30,7 @@ type Cadvisor struct { } // New is a dummy function to construct a dummy Cadvisor struct for windows -func New(containerOrchestrator string, machineInfo *host.MachineInfo, logger *zap.Logger) *Cadvisor { +func New(containerOrchestrator string, machineInfo *host.Info, logger *zap.Logger) *Cadvisor { return &Cadvisor{} } diff --git a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go new file mode 100644 index 000000000000..4658bacb5762 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume.go @@ -0,0 +1,225 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "bufio" + "context" + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "go.uber.org/zap" +) + +var ebsMountPointRegex = regexp.MustCompile(`kubernetes\.io/aws-ebs/mounts/aws/(.+)/(vol-\w+)$`) + +type ebsVolumeClient interface { + DescribeVolumesWithContext(context.Context, *ec2.DescribeVolumesInput, ...request.Option) (*ec2.DescribeVolumesOutput, error) +} + +type ebsVolumeProvider interface { + getEBSVolumeID(devName string) string + extractEbsIDsUsedByKubernetes() map[string]string +} + +type ebsVolume struct { + refreshInterval time.Duration + maxJitterTime time.Duration + instanceID string + client ebsVolumeClient + logger *zap.Logger + shutdownC chan bool + + mu sync.RWMutex + // device name to volumeID mapping + dev2Vol map[string]string + + // for testing only + hostMounts string + osLstat func(name string) (os.FileInfo, error) + evalSymLinks func(path string) (string, error) +} + +type ebsVolumeOption func(*ebsVolume) + +func newEBSVolume(ctx context.Context, session *session.Session, instanceID string, region string, + refreshInterval time.Duration, logger *zap.Logger, options ...ebsVolumeOption) ebsVolumeProvider { + e := &ebsVolume{ + dev2Vol: make(map[string]string), + instanceID: instanceID, + client: ec2.New(session, aws.NewConfig().WithRegion(region)), + refreshInterval: refreshInterval, + maxJitterTime: 3 * time.Second, + shutdownC: make(chan bool), + logger: logger, + hostMounts: hostMounts, + osLstat: os.Lstat, + evalSymLinks: filepath.EvalSymlinks, + } + + for _, opt := range options { + opt(e) + } + + shouldRefresh := func() bool { + // keep refreshing to get updated ebs volumes + return true + } + go refreshUntil(ctx, e.refresh, e.refreshInterval, shouldRefresh, e.maxJitterTime) + + return e +} + +func (e *ebsVolume) refresh(ctx context.Context) { + e.logger.Info("Fetch ebs volumes from ec2 api") + + input := &ec2.DescribeVolumesInput{ + Filters: []*ec2.Filter{ + { + Name: aws.String("attachment.instance-id"), + Values: aws.StringSlice([]string{e.instanceID}), + }, + }, + } + + devPathSet := make(map[string]bool) + allSuccess := false + for { + result, err := e.client.DescribeVolumesWithContext(ctx, input) + if err != nil { + e.logger.Warn("Fail to call ec2 DescribeVolumes", zap.Error(err)) + break + } + for _, volume := range result.Volumes { + for _, attachment := range volume.Attachments { + devPath := e.addEBSVolumeMapping(volume.AvailabilityZone, attachment) + devPathSet[devPath] = true + } + } + allSuccess = true + if result.NextToken == nil { + break + } + input.SetNextToken(*result.NextToken) + } + + if allSuccess { + e.mu.Lock() + defer e.mu.Unlock() + for k := range e.dev2Vol { + if !devPathSet[k] { + delete(e.dev2Vol, k) + } + } + } +} + +func (e *ebsVolume) addEBSVolumeMapping(zone *string, attachement *ec2.VolumeAttachment) string { + // *attachement.Device is sth like: /dev/xvda + devPath := e.findNvmeBlockNameIfPresent(*attachement.Device) + if devPath == "" { + devPath = *attachement.Device + } + + e.mu.Lock() + defer e.mu.Unlock() + e.dev2Vol[devPath] = fmt.Sprintf("aws://%s/%s", *zone, *attachement.VolumeId) + return devPath +} + +// find nvme block name by symlink, if symlink doesn't exist, return "" +func (e *ebsVolume) findNvmeBlockNameIfPresent(devName string) string { + // for nvme(ssd), there is a symlink from devName to nvme block name, i.e. /dev/xvda -> /dev/nvme0n1 + // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html + hasRootFs := true + if _, err := e.osLstat(hostProc); os.IsNotExist(err) { + hasRootFs = false + } + nvmeName := "" + + if hasRootFs { + devName = "/rootfs" + devName + } + + if info, err := e.osLstat(devName); err == nil { + if info.Mode()&os.ModeSymlink == os.ModeSymlink { + if path, err := e.evalSymLinks(devName); err == nil { + nvmeName = path + } + } + } + + if nvmeName != "" && hasRootFs { + nvmeName = strings.TrimPrefix(nvmeName, "/rootfs") + } + return nvmeName +} + +func (e *ebsVolume) getEBSVolumeID(devName string) string { + e.mu.RLock() + defer e.mu.RUnlock() + + for k, v := range e.dev2Vol { + // The key of dev2Vol is device name like nvme0n1, while the input devName could be a partition name like nvme0n1p1 + if strings.HasPrefix(devName, k) { + return v + } + } + + return "" +} + +//extract the ebs volume id used by kubernetes cluster +func (e *ebsVolume) extractEbsIDsUsedByKubernetes() map[string]string { + ebsVolumeIDs := make(map[string]string) + + file, err := os.Open(e.hostMounts) + if err != nil { + e.logger.Debug("cannot open /rootfs/proc/mounts", zap.Error(err)) + return ebsVolumeIDs + } + defer file.Close() + + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + lineStr := scanner.Text() + if strings.TrimSpace(lineStr) == "" { + continue + } + + //example line: /dev/nvme1n1 /var/lib/kubelet/plugins/kubernetes.io/aws-ebs/mounts/aws/us-west-2b/vol-0d9f0816149eb2050 ext4 rw,relatime,data=ordered 0 0 + keys := strings.Split(lineStr, " ") + if len(keys) < 2 { + continue + } + matches := ebsMountPointRegex.FindStringSubmatch(keys[1]) + if len(matches) > 0 { + // Set {"/dev/nvme1n1": "aws://us-west-2b/vol-0d9f0816149eb2050"} + ebsVolumeIDs[keys[0]] = fmt.Sprintf("aws://%s/%s", matches[1], matches[2]) + } + } + + return ebsVolumeIDs +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ebsvolume_test.go b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume_test.go new file mode 100644 index 000000000000..246b7aa06fbc --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ebsvolume_test.go @@ -0,0 +1,195 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "errors" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/awstesting/mock" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +type mockEBSVolumeClient struct { + count int + success chan bool + mu sync.Mutex +} + +func (m *mockEBSVolumeClient) DescribeVolumesWithContext(context.Context, *ec2.DescribeVolumesInput, + ...request.Option) (*ec2.DescribeVolumesOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.count++ + + if m.count == 1 { + return &ec2.DescribeVolumesOutput{}, errors.New("error") + } + + if m.count == 2 { + return &ec2.DescribeVolumesOutput{ + NextToken: aws.String("nextToken"), + Volumes: []*ec2.Volume{ + { + AvailabilityZone: aws.String("us-west-2"), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String("/dev/xvdb"), + VolumeId: aws.String("vol-0c241693efb58734a"), + }, + }, + }, + }, + }, nil + } + + if m.count == 3 { + return &ec2.DescribeVolumesOutput{ + // NextToken: nil, + Volumes: []*ec2.Volume{ + { + AvailabilityZone: aws.String("us-west-2"), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String("/dev/xvdc"), + VolumeId: aws.String("vol-0303a1cc896c42d28"), + }, + }, + }, + }, + }, nil + } + + if m.count == 4 { + close(m.success) + } + + return &ec2.DescribeVolumesOutput{ + // NextToken: nil, + Volumes: []*ec2.Volume{ + { + AvailabilityZone: aws.String("us-west-2"), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String("/dev/xvdc"), + VolumeId: aws.String("vol-0303a1cc896c42d28"), + }, + }, + }, + { + AvailabilityZone: aws.String("us-west-2"), + Attachments: []*ec2.VolumeAttachment{ + { + Device: aws.String("/dev/xvdb"), + VolumeId: aws.String("vol-0c241693efb58734a"), + }, + }, + }, + }, + }, nil +} + +type mockFileInfo struct { +} + +func (m *mockFileInfo) Name() string { + return "mockFileInfo" +} + +func (m *mockFileInfo) Size() int64 { + return 256 +} + +func (m *mockFileInfo) Mode() os.FileMode { + return os.ModeSymlink +} + +func (m *mockFileInfo) ModTime() time.Time { + return time.Now() +} + +func (m *mockFileInfo) IsDir() bool { + return false +} + +func (m *mockFileInfo) Sys() interface{} { + return nil +} + +func TestEBSVolume(t *testing.T) { + ctx := context.Background() + sess := mock.Session + mockVolumeClient := &mockEBSVolumeClient{ + success: make(chan bool), + } + clientOption := func(e *ebsVolume) { + e.client = mockVolumeClient + } + maxJitterOption := func(e *ebsVolume) { + e.maxJitterTime = time.Millisecond + } + hostMountsOption := func(e *ebsVolume) { + e.hostMounts = "./testdata/mounts" + } + + LstatOption := func(e *ebsVolume) { + e.osLstat = func(name string) (os.FileInfo, error) { + if name == hostProc { + return &mockFileInfo{}, nil + } + + return &mockFileInfo{}, nil + } + } + + evalSymLinksOption := func(e *ebsVolume) { + e.evalSymLinks = func(path string) (string, error) { + if strings.HasSuffix(path, "/dev/xvdb") { + return "/dev/nvme0n2", nil + } + return "", errors.New("error") + } + } + + e := newEBSVolume(ctx, sess, "instanceId", "us-west-2", time.Millisecond, zap.NewNop(), + clientOption, maxJitterOption, hostMountsOption, LstatOption, evalSymLinksOption) + + <-mockVolumeClient.success + assert.Equal(t, "aws://us-west-2/vol-0303a1cc896c42d28", e.getEBSVolumeID("/dev/xvdc")) + assert.Equal(t, "aws://us-west-2/vol-0c241693efb58734a", e.getEBSVolumeID("/dev/nvme0n2")) + assert.Equal(t, "", e.getEBSVolumeID("/dev/invalid")) + + ebsIds := e.extractEbsIDsUsedByKubernetes() + assert.Equal(t, 1, len(ebsIds)) + assert.Equal(t, "aws://us-west-2b/vol-0d9f0816149eb2050", ebsIds["/dev/nvme1n1"]) + + //set e.hostMounts to an invalid path + hostMountsOption = func(e *ebsVolume) { + e.hostMounts = "/an-invalid-path" + } + e = newEBSVolume(ctx, sess, "instanceId", "us-west-2", time.Millisecond, zap.NewNop(), + clientOption, maxJitterOption, hostMountsOption, LstatOption, evalSymLinksOption) + ebsIds = e.extractEbsIDsUsedByKubernetes() + assert.Equal(t, 0, len(ebsIds)) +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2metadata.go b/receiver/awscontainerinsightreceiver/internal/host/ec2metadata.go new file mode 100644 index 000000000000..a6ae1f776cad --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2metadata.go @@ -0,0 +1,100 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "time" + + awsec2metadata "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "go.uber.org/zap" +) + +type metadataClient interface { + GetInstanceIdentityDocumentWithContext(ctx context.Context) (awsec2metadata.EC2InstanceIdentityDocument, error) +} + +type ec2MetadataProvider interface { + getInstanceID() string + getInstanceType() string + getRegion() string +} + +type ec2Metadata struct { + logger *zap.Logger + client metadataClient + refreshInterval time.Duration + instanceID string + instanceType string + region string + instanceIDReadyC chan bool +} + +type ec2MetadataOption func(*ec2Metadata) + +func newEC2Metadata(ctx context.Context, session *session.Session, refreshInterval time.Duration, + instanceIDReadyC chan bool, logger *zap.Logger, options ...ec2MetadataOption) ec2MetadataProvider { + emd := &ec2Metadata{ + client: awsec2metadata.New(session), + refreshInterval: refreshInterval, + instanceIDReadyC: instanceIDReadyC, + logger: logger, + } + + for _, opt := range options { + opt(emd) + } + + shouldRefresh := func() bool { + //stop the refresh once we get instance ID and type successfully + return emd.instanceID == "" || emd.instanceType == "" + } + + go refreshUntil(ctx, emd.refresh, emd.refreshInterval, shouldRefresh, 0) + + return emd +} + +func (emd *ec2Metadata) refresh(ctx context.Context) { + emd.logger.Info("Fetch instance id and type from ec2 metadata") + + doc, err := emd.client.GetInstanceIdentityDocumentWithContext(ctx) + if err != nil { + emd.logger.Error("Failed to get ec2 metadata", zap.Error(err)) + return + } + + emd.instanceID = doc.InstanceID + emd.instanceType = doc.InstanceType + emd.region = doc.Region + + // notify ec2tags and ebsvolume that the instance id is ready + if emd.instanceID != "" { + close(emd.instanceIDReadyC) + } +} + +func (emd *ec2Metadata) getInstanceID() string { + return emd.instanceID +} + +func (emd *ec2Metadata) getInstanceType() string { + return emd.instanceType +} + +func (emd *ec2Metadata) getRegion() string { + return emd.region +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2metadata_test.go b/receiver/awscontainerinsightreceiver/internal/host/ec2metadata_test.go new file mode 100644 index 000000000000..ed7a5ae9da52 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2metadata_test.go @@ -0,0 +1,60 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "errors" + "testing" + "time" + + awsec2metadata "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/awstesting/mock" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +type mockMetadataClient struct { + count int +} + +func (m *mockMetadataClient) GetInstanceIdentityDocumentWithContext(ctx context.Context) (awsec2metadata.EC2InstanceIdentityDocument, error) { + m.count++ + if m.count == 1 { + return awsec2metadata.EC2InstanceIdentityDocument{}, errors.New("error") + } + + return awsec2metadata.EC2InstanceIdentityDocument{ + Region: "us-west-2", + InstanceID: "i-abcd1234", + InstanceType: "c4.xlarge", + }, nil +} + +func TestEC2Metadata(t *testing.T) { + ctx := context.Background() + sess := mock.Session + instanceIDReadyC := make(chan bool) + clientOption := func(e *ec2Metadata) { + e.client = &mockMetadataClient{} + } + e := newEC2Metadata(ctx, sess, 3*time.Millisecond, instanceIDReadyC, zap.NewNop(), clientOption) + assert.NotNil(t, e) + + <-instanceIDReadyC + assert.Equal(t, "i-abcd1234", e.getInstanceID()) + assert.Equal(t, "c4.xlarge", e.getInstanceType()) + assert.Equal(t, "us-west-2", e.getRegion()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go b/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go new file mode 100644 index 000000000000..4c3bdcab6039 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2tags.go @@ -0,0 +1,141 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "go.uber.org/zap" +) + +const ( + clusterNameKey = "container-insight-eks-cluster-name" + clusterNameTagKeyPrefix = "kubernetes.io/cluster/" + autoScalingGroupNameTag = "aws:autoscaling:groupName" +) + +type ec2TagsClient interface { + DescribeTagsWithContext(ctx context.Context, input *ec2.DescribeTagsInput, + opts ...request.Option) (*ec2.DescribeTagsOutput, error) +} + +type ec2TagsProvider interface { + getClusterName() string + getAutoScalingGroupName() string +} + +type ec2Tags struct { + refreshInterval time.Duration + maxJitterTime time.Duration + instanceID string + client ec2TagsClient + clusterName string + autoScalingGroupName string + isSucess chan bool //only used in testing + logger *zap.Logger +} + +type ec2TagsOption func(*ec2Tags) + +func newEC2Tags(ctx context.Context, session *session.Session, instanceID string, + refreshInterval time.Duration, logger *zap.Logger, options ...ec2TagsOption) ec2TagsProvider { + et := &ec2Tags{ + instanceID: instanceID, + client: ec2.New(session), + refreshInterval: refreshInterval, + maxJitterTime: 3 * time.Second, + logger: logger, + } + + for _, opt := range options { + opt(et) + } + + shouldRefresh := func() bool { + //stop once we get the cluster name + return et.clusterName == "" + } + + go refreshUntil(ctx, et.refresh, et.refreshInterval, shouldRefresh, et.maxJitterTime) + + return et +} + +func (et *ec2Tags) fetchEC2Tags(ctx context.Context) map[string]string { + et.logger.Info("Fetch ec2 tags to detect cluster name and auto scaling group name", zap.String("instanceId", et.instanceID)) + tags := make(map[string]string) + + tagFilters := []*ec2.Filter{ + { + Name: aws.String("resource-type"), + Values: aws.StringSlice([]string{"instance"}), + }, + { + Name: aws.String("resource-id"), + Values: aws.StringSlice([]string{et.instanceID}), + }, + } + + input := &ec2.DescribeTagsInput{ + Filters: tagFilters, + } + + for { + result, err := et.client.DescribeTagsWithContext(ctx, input) + if err != nil { + et.logger.Warn("Fail to call ec2 DescribeTags", zap.Error(err), zap.String("instanceId", et.instanceID)) + break + } + + for _, tag := range result.Tags { + key := *tag.Key + tags[key] = *tag.Value + if strings.HasPrefix(key, clusterNameTagKeyPrefix) && *tag.Value == "owned" { + tags[clusterNameKey] = key[len(clusterNameTagKeyPrefix):] + } + } + + if result.NextToken == nil { + break + } + input.SetNextToken(*result.NextToken) + } + + return tags +} + +func (et *ec2Tags) getClusterName() string { + return et.clusterName +} + +func (et *ec2Tags) getAutoScalingGroupName() string { + return et.autoScalingGroupName +} + +func (et *ec2Tags) refresh(ctx context.Context) { + tags := et.fetchEC2Tags(ctx) + et.clusterName = tags[clusterNameKey] + et.autoScalingGroupName = tags[autoScalingGroupNameTag] + if et.isSucess != nil && et.clusterName != "" && et.autoScalingGroupName != "" { + // this will be executed only in testing + close(et.isSucess) + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go b/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go new file mode 100644 index 000000000000..ebb4308b3727 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/ec2tags_test.go @@ -0,0 +1,89 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/awstesting/mock" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +type mockEC2TagsClient struct { + count int +} + +var tokenString = "tokenString" +var clusterKey = clusterNameTagKeyPrefix + "cluster-name" +var clusterValue = "owned" +var asgKey = autoScalingGroupNameTag +var asgValue = "asg" + +func (m *mockEC2TagsClient) DescribeTagsWithContext(ctx context.Context, input *ec2.DescribeTagsInput, + opts ...request.Option) (*ec2.DescribeTagsOutput, error) { + m.count++ + if m.count == 1 { + return &ec2.DescribeTagsOutput{}, errors.New("error") + } + + if m.count == 2 { + return &ec2.DescribeTagsOutput{ + NextToken: &tokenString, + Tags: []*ec2.TagDescription{ + { + Key: &asgKey, + Value: &asgValue, + }, + }, + }, nil + } + + return &ec2.DescribeTagsOutput{ + Tags: []*ec2.TagDescription{ + { + Key: &clusterKey, + Value: &clusterValue, + }, + }, + }, nil +} + +func TestEC2Tags(t *testing.T) { + ctx := context.Background() + sess := mock.Session + clientOption := func(e *ec2Tags) { + e.client = &mockEC2TagsClient{} + } + maxJitterOption := func(e *ec2Tags) { + e.maxJitterTime = 0 + } + isSucessOption := func(e *ec2Tags) { + e.isSucess = make(chan bool) + } + et := newEC2Tags(ctx, sess, "instanceId", time.Millisecond, zap.NewNop(), clientOption, + maxJitterOption, isSucessOption) + + // wait for ec2 tags are fetched + e := et.(*ec2Tags) + <-e.isSucess + assert.Equal(t, "cluster-name", et.getClusterName()) + assert.Equal(t, "asg", et.getAutoScalingGroupName()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go new file mode 100644 index 000000000000..13cc8b316b8e --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo.go @@ -0,0 +1,169 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" +) + +// Info contains information about a host +type Info struct { + cancel context.CancelFunc + logger *zap.Logger + awsSession *session.Session + refreshInterval time.Duration + instanceIDReadyC chan bool // close of this channel indicates instance ID is ready + + ebsVolumeReadyC chan bool // close of this channel indicates ebsVolume is initialized. It is used only in test + ec2TagsReadyC chan bool // close of this channel indicates ec2Tags is initialized. It is used only in test + + nodeCapacity nodeCapacityProvider + ec2Metadata ec2MetadataProvider + ebsVolume ebsVolumeProvider + ec2Tags ec2TagsProvider + + awsSessionCreator func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) + nodeCapacityCreator func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) + ec2MetadataCreator func(context.Context, *session.Session, time.Duration, chan bool, *zap.Logger, ...ec2MetadataOption) ec2MetadataProvider + ebsVolumeCreator func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, ...ebsVolumeOption) ebsVolumeProvider + ec2TagsCreator func(context.Context, *session.Session, string, time.Duration, *zap.Logger, ...ec2TagsOption) ec2TagsProvider +} + +type machineInfoOption func(*Info) + +// NewInfo creates a new Info struct +func NewInfo(refreshInterval time.Duration, logger *zap.Logger, options ...machineInfoOption) (*Info, error) { + ctx, cancel := context.WithCancel(context.Background()) + mInfo := &Info{ + cancel: cancel, + refreshInterval: refreshInterval, + instanceIDReadyC: make(chan bool), + logger: logger, + + awsSessionCreator: awsutil.GetAWSConfigSession, + nodeCapacityCreator: newNodeCapacity, + ec2MetadataCreator: newEC2Metadata, + ebsVolumeCreator: newEBSVolume, + ec2TagsCreator: newEC2Tags, + + // used in test only + ebsVolumeReadyC: make(chan bool), + ec2TagsReadyC: make(chan bool), + } + + for _, opt := range options { + opt(mInfo) + } + + nodeCapacity, err := mInfo.nodeCapacityCreator(logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize NodeCapacity: %v", err) + } + mInfo.nodeCapacity = nodeCapacity + + defaultSessionConfig := awsutil.CreateDefaultSessionConfig() + _, session, err := mInfo.awsSessionCreator(logger, &awsutil.Conn{}, &defaultSessionConfig) + if err != nil { + return nil, fmt.Errorf("failed to create aws session: %v", err) + } + mInfo.awsSession = session + + mInfo.ec2Metadata = mInfo.ec2MetadataCreator(ctx, session, refreshInterval, mInfo.instanceIDReadyC, logger) + + go mInfo.lazyInitEBSVolume(ctx) + go mInfo.lazyInitEC2Tags(ctx) + return mInfo, nil +} + +func (m *Info) lazyInitEBSVolume(ctx context.Context) { + //wait until the instance id is ready + <-m.instanceIDReadyC + //Because ebs volumes only change occasionally, we refresh every 5 collection intervals to reduce ec2 api calls + m.ebsVolume = m.ebsVolumeCreator(ctx, m.awsSession, m.GetInstanceID(), m.GetRegion(), + 5*m.refreshInterval, m.logger) + close(m.ebsVolumeReadyC) +} + +func (m *Info) lazyInitEC2Tags(ctx context.Context) { + //wait until the instance id is ready + <-m.instanceIDReadyC + m.ec2Tags = m.ec2TagsCreator(ctx, m.awsSession, m.GetInstanceID(), m.refreshInterval, m.logger) + close(m.ec2TagsReadyC) +} + +// GetInstanceID returns the ec2 instance id for the host +func (m *Info) GetInstanceID() string { + return m.ec2Metadata.getInstanceID() +} + +// GetInstanceType returns the ec2 instance type for the host +func (m *Info) GetInstanceType() string { + return m.ec2Metadata.getInstanceType() +} + +// GetRegion returns the region for the host +func (m *Info) GetRegion() string { + return m.ec2Metadata.getRegion() +} + +// GetNumCores returns the number of cpu cores on the host +func (m *Info) GetNumCores() int64 { + return m.nodeCapacity.getNumCores() +} + +// GetMemoryCapacity returns the total memory (in bytes) on the host +func (m *Info) GetMemoryCapacity() int64 { + return m.nodeCapacity.getMemoryCapacity() +} + +// GetEBSVolumeID returns the ebs volume id corresponding to the given device name +func (m *Info) GetEBSVolumeID(devName string) string { + if m.ebsVolume != nil { + return m.ebsVolume.getEBSVolumeID(devName) + } + + return "" +} + +// GetClusterName returns the cluster name associated with the host +func (m *Info) GetClusterName() string { + if m.ec2Tags != nil { + return m.ec2Tags.getClusterName() + } + + return "" +} + +// GetAutoScalingGroupName returns the auto scaling group associated with the host +func (m *Info) GetAutoScalingGroupName() string { + if m.ec2Tags != nil { + return m.ec2Tags.getAutoScalingGroupName() + } + + return "" +} + +// Shutdown stops the host Info +func (m *Info) Shutdown() { + m.cancel() +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go new file mode 100644 index 000000000000..d3c43af46a9f --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/hostinfo_test.go @@ -0,0 +1,151 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/awsutil" +) + +type mockNodeCapacity struct { +} + +func (m *mockNodeCapacity) getMemoryCapacity() int64 { + return 1024 +} + +func (m *mockNodeCapacity) getNumCores() int64 { + return 2 +} + +type mockEC2Metadata struct { +} + +func (m *mockEC2Metadata) getInstanceID() string { + return "instance-id" +} + +func (m *mockEC2Metadata) getInstanceType() string { + return "instance-type" +} + +func (m *mockEC2Metadata) getRegion() string { + return "region" +} + +type mockEBSVolume struct { +} + +func (m *mockEBSVolume) getEBSVolumeID(devName string) string { + return "ebs-volume-id" +} + +func (m *mockEBSVolume) extractEbsIDsUsedByKubernetes() map[string]string { + return map[string]string{} +} + +type mockEC2Tags struct { +} + +func (m *mockEC2Tags) getClusterName() string { + return "cluster-name" +} + +func (m *mockEC2Tags) getAutoScalingGroupName() string { + return "asg" +} + +func TestInfo(t *testing.T) { + // test the case when nodeCapacity fails to initialize + nodeCapacityCreatorOpt := func(m *Info) { + m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + return nil, errors.New("error") + } + } + m, err := NewInfo(time.Minute, zap.NewNop(), nodeCapacityCreatorOpt) + assert.Nil(t, m) + assert.NotNil(t, err) + + // test the case when aws session fails to initialize + nodeCapacityCreatorOpt = func(m *Info) { + m.nodeCapacityCreator = func(*zap.Logger, ...nodeCapacityOption) (nodeCapacityProvider, error) { + return &mockNodeCapacity{}, nil + } + } + awsSessionCreatorOpt := func(m *Info) { + m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + return nil, nil, errors.New("error") + } + } + m, err = NewInfo(time.Minute, zap.NewNop(), nodeCapacityCreatorOpt, awsSessionCreatorOpt) + assert.Nil(t, m) + assert.NotNil(t, err) + + // test normal case where everything is working + awsSessionCreatorOpt = func(m *Info) { + m.awsSessionCreator = func(*zap.Logger, awsutil.ConnAttr, *awsutil.AWSSessionSettings) (*aws.Config, *session.Session, error) { + return &aws.Config{}, &session.Session{}, nil + } + } + ec2MetadataCreatorOpt := func(m *Info) { + m.ec2MetadataCreator = func(context.Context, *session.Session, time.Duration, chan bool, *zap.Logger, + ...ec2MetadataOption) ec2MetadataProvider { + return &mockEC2Metadata{} + } + } + ebsVolumeCreatorOpt := func(m *Info) { + m.ebsVolumeCreator = func(context.Context, *session.Session, string, string, time.Duration, *zap.Logger, + ...ebsVolumeOption) ebsVolumeProvider { + return &mockEBSVolume{} + } + } + ec2TagsCreatorOpt := func(m *Info) { + m.ec2TagsCreator = func(context.Context, *session.Session, string, time.Duration, *zap.Logger, + ...ec2TagsOption) ec2TagsProvider { + return &mockEC2Tags{} + } + } + m, err = NewInfo(time.Minute, zap.NewNop(), awsSessionCreatorOpt, + nodeCapacityCreatorOpt, ec2MetadataCreatorOpt, ebsVolumeCreatorOpt, ec2TagsCreatorOpt) + assert.Nil(t, err) + assert.NotNil(t, m) + + // befoe ebsVolume and ec2Tags are initialized + assert.Equal(t, "", m.GetEBSVolumeID("dev")) + assert.Equal(t, "", m.GetClusterName()) + assert.Equal(t, "", m.GetAutoScalingGroupName()) + + // close the channel so that ebsVolume and ec2Tags can be initialized + close(m.instanceIDReadyC) + <-m.ebsVolumeReadyC + <-m.ec2TagsReadyC + + assert.Equal(t, "instance-id", m.GetInstanceID()) + assert.Equal(t, "instance-type", m.GetInstanceType()) + assert.Equal(t, int64(2), m.GetNumCores()) + assert.Equal(t, int64(1024), m.GetMemoryCapacity()) + assert.Equal(t, "ebs-volume-id", m.GetEBSVolumeID("dev")) + assert.Equal(t, "cluster-name", m.GetClusterName()) + assert.Equal(t, "asg", m.GetAutoScalingGroupName()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/machineinfo.go b/receiver/awscontainerinsightreceiver/internal/host/machineinfo.go deleted file mode 100644 index ed2623036f52..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/host/machineinfo.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package host - -import ( - "sync" - "time" - - "go.uber.org/zap" -) - -// MachineInfo contains information about a host -type MachineInfo struct { - sync.RWMutex - logger *zap.Logger - refreshInterval time.Duration - shutdownC chan bool -} - -// NewMachineInfo creates a new MachineInfo struct -func NewMachineInfo(refreshInterval time.Duration, logger *zap.Logger) *MachineInfo { - mInfo := &MachineInfo{ - refreshInterval: refreshInterval, - shutdownC: make(chan bool), - logger: logger, - } - - // TODO: add more initializations - return mInfo -} - -// Shutdown stops the refreshing of machine info -func (m *MachineInfo) Shutdown() { - close(m.shutdownC) -} - -// GetInstanceID returns the ec2 instance id for the host -func (m *MachineInfo) GetInstanceID() string { - //TODO: add implementation - return "" -} - -// GetInstanceType returns the ec2 instance type for the host -func (m *MachineInfo) GetInstanceType() string { - //TODO: add implementation - return "" -} - -// GetNumCores returns the number of cpu cores on the host -func (m *MachineInfo) GetNumCores() int64 { - //TODO: add implementation - return 0 -} - -// GetMemoryCapacity returns the total memory (in bytes) on the host -func (m *MachineInfo) GetMemoryCapacity() int64 { - //TODO: add implementation - return 0 -} - -// GetEbsVolumeID returns the ebs volume id corresponding to the given device name -func (m *MachineInfo) GetEbsVolumeID(devName string) string { - //TODO: add implementation - return "" -} - -// GetClusterName returns the cluster name associated with the host -func (m *MachineInfo) GetClusterName() string { - //TODO: add implementation - return "" -} - -// GetAutoScalingGroupName returns the auto scaling group associated with the host -func (m *MachineInfo) GetAutoScalingGroupName() string { - //TODO: add implementation - return "" -} diff --git a/receiver/awscontainerinsightreceiver/internal/host/machineinfo_test.go b/receiver/awscontainerinsightreceiver/internal/host/machineinfo_test.go deleted file mode 100644 index 07005dc2f028..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/host/machineinfo_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package host - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" -) - -func TestMachineInfo(t *testing.T) { - m := NewMachineInfo(time.Minute, zap.NewNop()) - assert.Equal(t, "", m.GetInstanceID()) - assert.Equal(t, "", m.GetInstanceType()) - assert.Equal(t, int64(0), m.GetNumCores()) - assert.Equal(t, int64(0), m.GetMemoryCapacity()) - assert.Equal(t, "", m.GetEbsVolumeID("dev")) - assert.Equal(t, "", m.GetClusterName()) - assert.Equal(t, "", m.GetAutoScalingGroupName()) - m.Shutdown() -} diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go new file mode 100644 index 000000000000..943fbb19429a --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity.go @@ -0,0 +1,100 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "fmt" + "os" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" + "go.uber.org/zap" +) + +const ( + goPSUtilProcDirEnv = "HOST_PROC" +) + +type nodeCapacityProvider interface { + getMemoryCapacity() int64 + getNumCores() int64 +} + +type nodeCapacity struct { + memCapacity int64 + cpuCapacity int64 + logger *zap.Logger + + // osLstat returns a FileInfo describing the named file. + osLstat func(name string) (os.FileInfo, error) + // osSetenv sets the value of the environment variable named by the key + osSetenv func(key string, value string) error + virtualMemory func() (*mem.VirtualMemoryStat, error) + cpuInfo func() ([]cpu.InfoStat, error) +} + +type nodeCapacityOption func(*nodeCapacity) + +func newNodeCapacity(logger *zap.Logger, options ...nodeCapacityOption) (nodeCapacityProvider, error) { + nc := &nodeCapacity{ + logger: logger, + osLstat: os.Lstat, + osSetenv: os.Setenv, + virtualMemory: mem.VirtualMemory, + cpuInfo: cpu.Info, + } + + for _, opt := range options { + opt(nc) + } + + if _, err := nc.osLstat(hostProc); os.IsNotExist(err) { + return nil, err + } + if err := nc.osSetenv(goPSUtilProcDirEnv, hostProc); err != nil { + return nil, fmt.Errorf("NodeCapacity cannot set goPSUtilProcDirEnv to %s: %v", hostProc, err) + } + + nc.parseCPU() + nc.parseMemory() + return nc, nil +} + +func (nc *nodeCapacity) parseMemory() { + if memStats, err := nc.virtualMemory(); err == nil { + nc.memCapacity = int64(memStats.Total) + } else { + // If any error happen, then there will be no mem utilization metrics + nc.logger.Error("NodeCapacity cannot get memStats from psUtil", zap.Error(err)) + } +} + +func (nc *nodeCapacity) parseCPU() { + if cpuInfos, err := nc.cpuInfo(); err == nil { + numCores := len(cpuInfos) + nc.cpuCapacity = int64(numCores) + } else { + // If any error happen, then there will be no cpu utilization metrics + nc.logger.Error("NodeCapacity cannot get cpuInfo from psUtil", zap.Error(err)) + } +} + +func (nc *nodeCapacity) getNumCores() int64 { + return nc.cpuCapacity +} + +func (nc *nodeCapacity) getMemoryCapacity() int64 { + return nc.memCapacity +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go new file mode 100644 index 000000000000..f6aa62075041 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/nodeCapacity_test.go @@ -0,0 +1,97 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "errors" + "os" + "testing" + + "github.com/shirou/gopsutil/cpu" + "github.com/shirou/gopsutil/mem" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestNodeCapacity(t *testing.T) { + // no proc directory + lstatOption := func(nc *nodeCapacity) { + nc.osLstat = func(name string) (os.FileInfo, error) { + return nil, os.ErrNotExist + } + } + nc, err := newNodeCapacity(zap.NewNop(), lstatOption) + assert.Nil(t, nc) + assert.NotNil(t, err) + + // can't set environment variables + lstatOption = func(nc *nodeCapacity) { + nc.osLstat = func(name string) (os.FileInfo, error) { + return nil, nil + } + } + setEnvOption := func(nc *nodeCapacity) { + nc.osSetenv = func(key, value string) error { + return errors.New("error") + } + } + nc, err = newNodeCapacity(zap.NewNop(), lstatOption, setEnvOption) + assert.Nil(t, nc) + assert.NotNil(t, err) + + // can't parse cpu and mem info + setEnvOption = func(nc *nodeCapacity) { + nc.osSetenv = func(key, value string) error { + return nil + } + } + virtualMemOption := func(nc *nodeCapacity) { + nc.virtualMemory = func() (*mem.VirtualMemoryStat, error) { + return nil, errors.New("error") + } + } + cpuInfoOption := func(nc *nodeCapacity) { + nc.cpuInfo = func() ([]cpu.InfoStat, error) { + return nil, errors.New("error") + } + } + nc, err = newNodeCapacity(zap.NewNop(), lstatOption, setEnvOption, virtualMemOption, cpuInfoOption) + assert.NotNil(t, nc) + assert.Nil(t, err) + assert.Equal(t, int64(0), nc.getMemoryCapacity()) + assert.Equal(t, int64(0), nc.getNumCores()) + + // normal case where everything is working + virtualMemOption = func(nc *nodeCapacity) { + nc.virtualMemory = func() (*mem.VirtualMemoryStat, error) { + return &mem.VirtualMemoryStat{ + Total: 1024, + }, nil + } + } + cpuInfoOption = func(nc *nodeCapacity) { + nc.cpuInfo = func() ([]cpu.InfoStat, error) { + return []cpu.InfoStat{ + {}, + {}, + }, nil + } + } + nc, err = newNodeCapacity(zap.NewNop(), lstatOption, setEnvOption, virtualMemOption, cpuInfoOption) + assert.NotNil(t, nc) + assert.Nil(t, err) + assert.Equal(t, int64(1024), nc.getMemoryCapacity()) + assert.Equal(t, int64(2), nc.getNumCores()) +} diff --git a/receiver/awscontainerinsightreceiver/internal/host/testdata/mounts b/receiver/awscontainerinsightreceiver/internal/host/testdata/mounts new file mode 100644 index 000000000000..832ecf6adebe --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/testdata/mounts @@ -0,0 +1,4 @@ +/dev/nvme1n1 /var/lib/kubelet/plugins/kubernetes.io/aws-ebs/mounts/aws/us-west-2b/vol-0d9f0816149eb2050 ext4 rw,relatime,data=ordered 0 0 +/dev/nvme1n1 /var/lib/kubelet/pods/df570351-2e4c-11e9-95ea-0a695d7ce286/volumes/kubernetes.io~aws-ebs/pvc-df563cf6-2e4c-11e9-95ea-0a695d7ce286 ext4 rw,relatime,data=ordered 0 0 + +/dev/invalidEntry \ No newline at end of file diff --git a/receiver/awscontainerinsightreceiver/internal/host/utils.go b/receiver/awscontainerinsightreceiver/internal/host/utils.go new file mode 100644 index 000000000000..353e6f999ea7 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/host/utils.go @@ -0,0 +1,67 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package host + +import ( + "context" + "hash/fnv" + "os" + "time" +) + +const ( + rootfs = "/rootfs" // the root directory "/" is mounted as "/rootfs" in container + hostProc = rootfs + "/proc" // "/rootfs/proc" in container refers to the host proc directory "/proc" + hostMounts = hostProc + "/mounts" // "/rootfs/proc/mounts" in container refers to "/proc/mounts" in the host +) + +func hostJitter(max time.Duration) time.Duration { + hostName, err := os.Hostname() + if err != nil { + hostName = "Unknown" + } + hash := fnv.New64() + hash.Write([]byte(hostName)) + // Right shift the uint64 hash by one to make sure the jitter duration is always positive + hostSleepJitter := time.Duration(int64(hash.Sum64()>>1)) % max + return hostSleepJitter +} + +// execute the refresh() function periodically with the given refresh interval +// until shouldRefresh() return false or the context is canceled +func refreshUntil(ctx context.Context, refresh func(context.Context), refreshInterval time.Duration, + shouldRefresh func() bool, maxJitterTime time.Duration) { + if maxJitterTime > 0 { + // add some sleep jitter to prevent a large number of receivers calling the ec2 api at the same time + time.Sleep(hostJitter(maxJitterTime)) + } + + // initial refresh + refresh(ctx) + + refreshTicker := time.NewTicker(refreshInterval) + defer refreshTicker.Stop() + for { + select { + case <-refreshTicker.C: + if !shouldRefresh() { + return + } + refresh(ctx) + case <-ctx.Done(): + return + } + } +} diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index 54c85b078cc4..86cc6a5c3917 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -25,7 +25,7 @@ import ( ) func TestK8sapiserver(t *testing.T) { - machineInfo := host.NewMachineInfo(time.Minute, zap.NewNop()) + machineInfo, _ := host.NewInfo(time.Minute, zap.NewNop()) k := New(machineInfo, zap.NewNop()) k.start() assert.NotNil(t, k) diff --git a/receiver/awscontainerinsightreceiver/receiver.go b/receiver/awscontainerinsightreceiver/receiver.go index 2c8ae1fc3462..2952503aa499 100644 --- a/receiver/awscontainerinsightreceiver/receiver.go +++ b/receiver/awscontainerinsightreceiver/receiver.go @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package awscontainerinsightreceiver import ( @@ -25,10 +24,6 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/cadvisor" - hostInfo "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/host" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/k8sapiserver" ) var _ component.MetricsReceiver = (*awsContainerInsightReceiver)(nil) @@ -51,7 +46,9 @@ type awsContainerInsightReceiver struct { func New( logger *zap.Logger, config *Config, - nextConsumer consumer.Metrics) (component.MetricsReceiver, error) { + nextConsumer consumer.Metrics, + cadvisor MetricsProvider, + k8sapiserver MetricsProvider) (component.MetricsReceiver, error) { if nextConsumer == nil { return nil, componenterror.ErrNilNextConsumer } @@ -60,6 +57,8 @@ func New( logger: logger, nextConsumer: nextConsumer, config: config, + cadvisor: cadvisor, + k8sapiserver: k8sapiserver, } return r, nil } @@ -67,10 +66,6 @@ func New( // Start collecting metrics from cadvisor and k8s api server (if it is an elected leader) func (acir *awsContainerInsightReceiver) Start(ctx context.Context, host component.Host) error { ctx, acir.cancel = context.WithCancel(obsreport.ReceiverContext(ctx, acir.config.ID(), "http")) - machineInfo := hostInfo.NewMachineInfo(acir.config.CollectionInterval, acir.logger) - acir.cadvisor = cadvisor.New(acir.config.ContainerOrchestrator, machineInfo, acir.logger) - acir.k8sapiserver = k8sapiserver.New(machineInfo, acir.logger) - // TODO: add more intialization code go func() { diff --git a/receiver/awscontainerinsightreceiver/receiver_test.go b/receiver/awscontainerinsightreceiver/receiver_test.go index b2fa9d329fb6..4f4f1c985ff1 100644 --- a/receiver/awscontainerinsightreceiver/receiver_test.go +++ b/receiver/awscontainerinsightreceiver/receiver_test.go @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package awscontainerinsightreceiver import ( @@ -26,12 +25,21 @@ import ( "go.uber.org/zap" ) +type MockMetricsProvider struct { +} + +func (m *MockMetricsProvider) GetMetrics() []pdata.Metrics { + return []pdata.Metrics{} +} + func TestReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) metricsReceiver, err := New( zap.NewNop(), cfg, consumertest.NewNop(), + &MockCadvisor{}, + &MockCadvisor{}, ) require.NoError(t, err) @@ -53,6 +61,8 @@ func TestReceiverForNilConsumer(t *testing.T) { zap.NewNop(), cfg, nil, + &MockCadvisor{}, + &MockCadvisor{}, ) require.NotNil(t, err) @@ -65,6 +75,8 @@ func TestCollectData(t *testing.T) { zap.NewNop(), cfg, new(consumertest.MetricsSink), + &MockCadvisor{}, + &MockCadvisor{}, ) require.NoError(t, err) @@ -99,6 +111,8 @@ func TestCollectDataWithErrConsumer(t *testing.T) { zap.NewNop(), cfg, consumertest.NewErr(errors.New("an error")), + &MockCadvisor{}, + &MockCadvisor{}, ) require.NoError(t, err)