Skip to content

Commit

Permalink
Merge pull request #5 from pxaws/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
yimuniao authored Jun 24, 2020
2 parents cd1d9c7 + 183b8db commit 3d59a97
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 34 deletions.
18 changes: 16 additions & 2 deletions internal/httpclient/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package httpclient

import (
"fmt"
"io"
"io/ioutil"
"log"
"math"
Expand Down Expand Up @@ -61,14 +62,27 @@ func (h *HttpClient) request(endpoint string) ([]byte, error) {
return nil, fmt.Errorf("unable to get response from %s, status code: %d", endpoint, resp.StatusCode)
}

if resp.ContentLength == -1 || resp.ContentLength >= maxHttpResponseLength {
if resp.ContentLength >= maxHttpResponseLength {
return nil, fmt.Errorf("get response with unexpected length from %s, response length: %d", endpoint, resp.ContentLength)
}

body, err := ioutil.ReadAll(resp.Body)
var reader io.Reader
//value -1 indicates that the length is unknown, see https://golang.org/src/net/http/response.go
//In this case, we read until the limit is reached
//This might happen with chunked responses from ECS Introspection API
if resp.ContentLength == -1 {
reader = io.LimitReader(resp.Body, maxHttpResponseLength)
} else {
reader = resp.Body
}

body, err := ioutil.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("unable to read response body from %s, error: %v", endpoint, err)
}

if len(body) == maxHttpResponseLength {
return nil, fmt.Errorf("response from %s, execeeds the maximum length: %v", endpoint, maxHttpResponseLength)
}
return body, nil
}
23 changes: 22 additions & 1 deletion plugins/inputs/cadvisor/container_info_processor.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package cadvisor

import (
. "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
"path"
"strconv"
"strings"
"time"

. "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"

"github.com/aws/amazon-cloudwatch-agent/plugins/inputs/cadvisor/extractors"
cinfo "github.com/google/cadvisor/info/v1"
)
Expand All @@ -16,6 +18,7 @@ const (
namespaceLable = "io.kubernetes.pod.namespace"
podIdLable = "io.kubernetes.pod.uid"
containerNameLable = "io.kubernetes.container.name"
cadvisorPathPrefix = "kubepods"
)

type podKey struct {
Expand Down Expand Up @@ -70,6 +73,15 @@ func processContainer(info *cinfo.ContainerInfo, detailMode bool, containerOrche
var result []*extractors.CAdvisorMetric
var pKey *podKey

// For "container in container" case, cadvisor will provide multiple stats for same container, for example:
// /kubepods/burstable/<pod-id>/<container-id>
// /kubepods/burstable/<pod-id>/<container-id>/kubepods/burstable/<pod-id>/<container-id>
// In above example, the second path is actually for the container in container, which should be ignored
keywordCount := strings.Count(info.Name, cadvisorPathPrefix)
if keywordCount > 1 {
return result, pKey
}

tags := map[string]string{}

var containerType string
Expand Down Expand Up @@ -121,6 +133,15 @@ func processContainer(info *cinfo.ContainerInfo, detailMode bool, containerOrche
func processPod(info *cinfo.ContainerInfo, podKeys map[string]podKey) []*extractors.CAdvisorMetric {
var result []*extractors.CAdvisorMetric

// For "container in container" case, cadvisor will provide multiple stats for same pod, for example:
// /kubepods/burstable/<pod-id>
// /kubepods/burstable/<pod-id>/<container-id>/kubepods/burstable/<pod-id>
// In above example, the second path is actually for the container in container, which should be ignored
keywordCount := strings.Count(info.Name, cadvisorPathPrefix)
if keywordCount > 1 {
return result
}

podKey := getPodKey(info, podKeys)
if podKey == nil {
return result
Expand Down
11 changes: 8 additions & 3 deletions plugins/inputs/cadvisor/extractors/extractor.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package extractors

import (
"fmt"
"log"
"time"

"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
cinfo "github.com/google/cadvisor/info/v1"
"time"
)

const (
Expand Down Expand Up @@ -65,9 +66,13 @@ func (c *CAdvisorMetric) AddTags(tags map[string]string) {
}

func (c *CAdvisorMetric) Merge(src *CAdvisorMetric) {
// If there is any conflict, keep the fields with earlier timestamp
for k, v := range src.fields {
if _, ok := c.fields[k]; ok {
panic(fmt.Errorf("metric being merged has conflict in fields, src: %v, dest: %v", *src, *c))
log.Printf("D! metric being merged has conflict in fields, src: %v, dest: %v \n", *src, *c)
if c.tags[containerinsightscommon.Timestamp] < src.tags[containerinsightscommon.Timestamp] {
continue
}
}
c.fields[k] = v
}
Expand Down
16 changes: 16 additions & 0 deletions plugins/inputs/cadvisor/extractors/extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package extractors

import (
"testing"

"github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon"
"github.com/stretchr/testify/assert"
)

func TestCAdvisorMetric_Merge(t *testing.T) {
src := &CAdvisorMetric{fields: map[string]interface{}{"value1": 1, "value2": 2}, tags: map[string]string{containerinsightscommon.Timestamp: "1586331559882"}}
dest := &CAdvisorMetric{fields: map[string]interface{}{"value1": 3, "value3": 3}, tags: map[string]string{containerinsightscommon.Timestamp: "1586331559973"}}
src.Merge(dest)
assert.Equal(t, 3, len(src.fields))
assert.Equal(t, 1, src.fields["value1"].(int))
}
73 changes: 45 additions & 28 deletions plugins/inputs/k8sapiserver/k8sapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -130,37 +130,54 @@ func (k *K8sAPIServer) Start(acc telegraf.Accumulator) error {
return err
}

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Printf("I! k8sapiserver OnStartedLeading: %s", k.NodeName)
// we're notified when we start
k.leading = true
},
OnStoppedLeading: func() {
log.Printf("I! k8sapiserver OnStoppedLeading: %s", k.NodeName)
// we can do cleanup here, or after the RunOrDie method returns
k.leading = false
//node and pod are only used for cluster level metrics, endpoint is used for decorator too.
k8sclient.Get().Node.Shutdown()
k8sclient.Get().Pod.Shutdown()
},
},
})
go k.startLeaderElection(ctx, lock)

return nil
}

func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourcelock.Interface) {

for {
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.Printf("I! k8sapiserver OnStartedLeading: %s", k.NodeName)
// we're notified when we start
k.leading = true
},
OnStoppedLeading: func() {
log.Printf("I! k8sapiserver OnStoppedLeading: %s", k.NodeName)
// we can do cleanup here, or after the RunOrDie method returns
k.leading = false
//node and pod are only used for cluster level metrics, endpoint is used for decorator too.
k8sclient.Get().Node.Shutdown()
k8sclient.Get().Pod.Shutdown()
},
OnNewLeader: func(identity string) {
log.Printf("I! k8sapiserver Switch New Leader: %s", identity)
},
},
})

select {
case <-ctx.Done(): //when leader election ends, the channel ctx.Done() will be closed
log.Printf("I! k8sapiserver shutdown Leader Election: %s", k.NodeName)
return
default:
}
}
}

func (k *K8sAPIServer) Stop() {
if k.cancel != nil {
k.cancel()
Expand Down

0 comments on commit 3d59a97

Please sign in to comment.