From 03ee09ef3c17ac7ac1b1f3939627630fcc347503 Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Tue, 23 Jun 2020 20:28:23 -0700 Subject: [PATCH 1/3] Add support for chunked response from ECS Introspection API In some edge cases, the chunked response might exceed the max response size that we can handle. We use limited reader to read up to the max size. --- internal/httpclient/httpclient.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/httpclient/httpclient.go b/internal/httpclient/httpclient.go index debef212e2..7a735e03e5 100644 --- a/internal/httpclient/httpclient.go +++ b/internal/httpclient/httpclient.go @@ -2,6 +2,7 @@ package httpclient import ( "fmt" + "io" "io/ioutil" "log" "math" @@ -61,14 +62,27 @@ func (h *HttpClient) request(endpoint string) ([]byte, error) { return nil, fmt.Errorf("unable to get response from %s, status code: %d", endpoint, resp.StatusCode) } - if resp.ContentLength == -1 || resp.ContentLength >= maxHttpResponseLength { + if resp.ContentLength >= maxHttpResponseLength { return nil, fmt.Errorf("get response with unexpected length from %s, response length: %d", endpoint, resp.ContentLength) } - body, err := ioutil.ReadAll(resp.Body) + var reader io.Reader + //value -1 indicates that the length is unknown, see https://golang.org/src/net/http/response.go + //In this case, we read until the limit is reached + //This might happen with chunked responses from ECS Introspection API + if resp.ContentLength == -1 { + reader = io.LimitReader(resp.Body, maxHttpResponseLength) + } else { + reader = resp.Body + } + + body, err := ioutil.ReadAll(reader) if err != nil { return nil, fmt.Errorf("unable to read response body from %s, error: %v", endpoint, err) } + if len(body) == maxHttpResponseLength { + return nil, fmt.Errorf("response from %s, execeeds the maximum length: %v", endpoint, maxHttpResponseLength) + } return body, nil } From e8daa5f5926c5a5f38e0ceb746c141be463e11e4 Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Tue, 23 Jun 2020 20:58:58 -0700 Subject: [PATCH 2/3] Fix the assert failure in cadvisor for container in container case For "container in container" case, cadvisor will emit 2 stats for the same container which will cause assert failure in extractor.Merge(). The fix has two parts, either one of them can solve the problem: * Ignore the stats of cgroup path for "container in container" * Tolerate two same metrics with different timestamps in Merge(). When a conflict is detected, only keep the metric with earlier timestamp --- .../cadvisor/container_info_processor.go | 23 ++++++++++++++++++- .../inputs/cadvisor/extractors/extractor.go | 11 ++++++--- .../cadvisor/extractors/extractor_test.go | 16 +++++++++++++ 3 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 plugins/inputs/cadvisor/extractors/extractor_test.go diff --git a/plugins/inputs/cadvisor/container_info_processor.go b/plugins/inputs/cadvisor/container_info_processor.go index 76311cd594..673c056ce7 100644 --- a/plugins/inputs/cadvisor/container_info_processor.go +++ b/plugins/inputs/cadvisor/container_info_processor.go @@ -1,11 +1,13 @@ package cadvisor import ( - . "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" "path" "strconv" + "strings" "time" + . "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" + "github.com/aws/amazon-cloudwatch-agent/plugins/inputs/cadvisor/extractors" cinfo "github.com/google/cadvisor/info/v1" ) @@ -16,6 +18,7 @@ const ( namespaceLable = "io.kubernetes.pod.namespace" podIdLable = "io.kubernetes.pod.uid" containerNameLable = "io.kubernetes.container.name" + cadvisorPathPrefix = "kubepods" ) type podKey struct { @@ -70,6 +73,15 @@ func processContainer(info *cinfo.ContainerInfo, detailMode bool, containerOrche var result []*extractors.CAdvisorMetric var pKey *podKey + // For "container in container" case, cadvisor will provide multiple stats for same container, for example: + // /kubepods/burstable// + // /kubepods/burstable///kubepods/burstable// + // In above example, the second path is actually for the container in container, which should be ignored + keywordCount := strings.Count(info.Name, cadvisorPathPrefix) + if keywordCount > 1 { + return result, pKey + } + tags := map[string]string{} var containerType string @@ -121,6 +133,15 @@ func processContainer(info *cinfo.ContainerInfo, detailMode bool, containerOrche func processPod(info *cinfo.ContainerInfo, podKeys map[string]podKey) []*extractors.CAdvisorMetric { var result []*extractors.CAdvisorMetric + // For "container in container" case, cadvisor will provide multiple stats for same pod, for example: + // /kubepods/burstable/ + // /kubepods/burstable///kubepods/burstable/ + // In above example, the second path is actually for the container in container, which should be ignored + keywordCount := strings.Count(info.Name, cadvisorPathPrefix) + if keywordCount > 1 { + return result + } + podKey := getPodKey(info, podKeys) if podKey == nil { return result diff --git a/plugins/inputs/cadvisor/extractors/extractor.go b/plugins/inputs/cadvisor/extractors/extractor.go index 948f78733f..a275b6a58c 100644 --- a/plugins/inputs/cadvisor/extractors/extractor.go +++ b/plugins/inputs/cadvisor/extractors/extractor.go @@ -1,10 +1,11 @@ package extractors import ( - "fmt" + "log" + "time" + "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" cinfo "github.com/google/cadvisor/info/v1" - "time" ) const ( @@ -65,9 +66,13 @@ func (c *CAdvisorMetric) AddTags(tags map[string]string) { } func (c *CAdvisorMetric) Merge(src *CAdvisorMetric) { + // If there is any conflict, keep the fields with earlier timestamp for k, v := range src.fields { if _, ok := c.fields[k]; ok { - panic(fmt.Errorf("metric being merged has conflict in fields, src: %v, dest: %v", *src, *c)) + log.Printf("D! metric being merged has conflict in fields, src: %v, dest: %v \n", *src, *c) + if c.tags[containerinsightscommon.Timestamp] < src.tags[containerinsightscommon.Timestamp] { + continue + } } c.fields[k] = v } diff --git a/plugins/inputs/cadvisor/extractors/extractor_test.go b/plugins/inputs/cadvisor/extractors/extractor_test.go new file mode 100644 index 0000000000..5451a55fce --- /dev/null +++ b/plugins/inputs/cadvisor/extractors/extractor_test.go @@ -0,0 +1,16 @@ +package extractors + +import ( + "testing" + + "github.com/aws/amazon-cloudwatch-agent/internal/containerinsightscommon" + "github.com/stretchr/testify/assert" +) + +func TestCAdvisorMetric_Merge(t *testing.T) { + src := &CAdvisorMetric{fields: map[string]interface{}{"value1": 1, "value2": 2}, tags: map[string]string{containerinsightscommon.Timestamp: "1586331559882"}} + dest := &CAdvisorMetric{fields: map[string]interface{}{"value1": 3, "value3": 3}, tags: map[string]string{containerinsightscommon.Timestamp: "1586331559973"}} + src.Merge(dest) + assert.Equal(t, 3, len(src.fields)) + assert.Equal(t, 1, src.fields["value1"].(int)) +} From 183b8dbdfcc7a50020ae800d2273316e0f0d545f Mon Sep 17 00:00:00 2001 From: Ping Xiang Date: Tue, 23 Jun 2020 22:24:47 -0700 Subject: [PATCH 3/3] Enable one Pod to be Leader multiple Times Background: CWAgent is deployed as Daemonset into the cluster to retrieve per Node metrics. But for the control plane KPI Server metrics, only one copy of metrics is needed, otherwise we will get duplicate CP metrics. K8s leaderelection is used so only the leading CWAgent will retrieve the kpiserver metrics: https://github.com/kubernetes/client-go/blob/master/tools/leaderelection/leaderelection.go#L213 However the code above only allows one CWAgent acts as leader once during its lifetime. During the EKS Cluster upgrade, Amazon EKS launching new API server nodes with the updated Kubernetes version to replace the existing ones. When this takes more than 1 minutes, the leading CWAgent will release the leader role and other CWAgent will take the leader role. But in case the cluster fleet is small and the kpi server reboots multiple times (e.g. upgrade twice without upgrade node group), then all CWAgents have acted as leader once and no CWAgent will take the leader role anymore. Fix: Do multiple leader elections in a loop so that a pod can be elected as leader multiple times (A pod can be elected at most once per leader election process. With multiple leader elections, a pod can becomes leader multiple times) --- plugins/inputs/k8sapiserver/k8sapiserver.go | 73 +++++++++++++-------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/k8sapiserver/k8sapiserver.go b/plugins/inputs/k8sapiserver/k8sapiserver.go index 5f28f1bf27..2080306d40 100644 --- a/plugins/inputs/k8sapiserver/k8sapiserver.go +++ b/plugins/inputs/k8sapiserver/k8sapiserver.go @@ -12,7 +12,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/internal/k8sCommon/k8sclient" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -130,37 +130,54 @@ func (k *K8sAPIServer) Start(acc telegraf.Accumulator) error { return err } - go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - // IMPORTANT: you MUST ensure that any code you have that - // is protected by the lease must terminate **before** - // you call cancel. Otherwise, you could have a background - // loop still running and another process could - // get elected before your background loop finished, violating - // the stated goal of the lease. - LeaseDuration: 60 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - log.Printf("I! k8sapiserver OnStartedLeading: %s", k.NodeName) - // we're notified when we start - k.leading = true - }, - OnStoppedLeading: func() { - log.Printf("I! k8sapiserver OnStoppedLeading: %s", k.NodeName) - // we can do cleanup here, or after the RunOrDie method returns - k.leading = false - //node and pod are only used for cluster level metrics, endpoint is used for decorator too. - k8sclient.Get().Node.Shutdown() - k8sclient.Get().Pod.Shutdown() - }, - }, - }) + go k.startLeaderElection(ctx, lock) return nil } +func (k *K8sAPIServer) startLeaderElection(ctx context.Context, lock resourcelock.Interface) { + + for { + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + log.Printf("I! k8sapiserver OnStartedLeading: %s", k.NodeName) + // we're notified when we start + k.leading = true + }, + OnStoppedLeading: func() { + log.Printf("I! k8sapiserver OnStoppedLeading: %s", k.NodeName) + // we can do cleanup here, or after the RunOrDie method returns + k.leading = false + //node and pod are only used for cluster level metrics, endpoint is used for decorator too. + k8sclient.Get().Node.Shutdown() + k8sclient.Get().Pod.Shutdown() + }, + OnNewLeader: func(identity string) { + log.Printf("I! k8sapiserver Switch New Leader: %s", identity) + }, + }, + }) + + select { + case <-ctx.Done(): //when leader election ends, the channel ctx.Done() will be closed + log.Printf("I! k8sapiserver shutdown Leader Election: %s", k.NodeName) + return + default: + } + } +} + func (k *K8sAPIServer) Stop() { if k.cancel != nil { k.cancel()