diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a4f78f9d3a1..e190d5c0135 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -194,6 +194,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Update the MySQL dashboard to use the Time Series Visual Builder. {pull}5996[5996] - Add experimental uwsgi module. {pull}6006[6006] - Docker and Kubernetes modules are now GA, instead of Beta. {pull}6105[6105] +- Add pct calculated fields for Pod and container CPU and memory usages. {pull}6158[6158] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 030bd9cffaf..4725ddb8535 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -5110,6 +5110,26 @@ type: long CPU used nanocores +[float] +=== `kubernetes.container.cpu.usage.node.pct` + +type: scaled_float + +format: percentage + +CPU usage as a percentage of the total node allocatable CPU + + +[float] +=== `kubernetes.container.cpu.usage.limit.pct` + +type: scaled_float + +format: percentage + +CPU usage as a percentage of the defined limit for the container (or total node allocatable CPU if unlimited) + + [float] == logs fields @@ -5198,6 +5218,26 @@ format: bytes Total memory usage +[float] +=== `kubernetes.container.memory.usage.node.pct` + +type: scaled_float + +format: percentage + +Memory usage as a percentage of the total node allocatable memory + + +[float] +=== `kubernetes.container.memory.usage.limit.pct` + +type: scaled_float + +format: percentage + +Memory usage as a percentage of the defined limit for the container (or total node allocatable memory if unlimited) + + [float] === `kubernetes.container.memory.rss.bytes` @@ -5709,6 +5749,74 @@ type: long Tx errors +[float] +== cpu fields + +CPU usage metrics + + + + +[float] +=== `kubernetes.pod.cpu.usage.nanocores` + +type: long + +CPU used nanocores + + +[float] +=== `kubernetes.pod.cpu.usage.node.pct` + +type: scaled_float + +format: percentage + +CPU usage as a percentage of the total node CPU + + +[float] +=== `kubernetes.pod.cpu.usage.limit.pct` + +type: scaled_float + +format: percentage + +CPU usage as a percentage of the defined limit for the pod containers (or total node CPU if unlimited) + + + + +[float] +=== `kubernetes.pod.memory.usage.bytes` + +type: long + +format: bytes + +Total memory usage + + +[float] +=== `kubernetes.pod.memory.usage.node.pct` + +type: scaled_float + +format: percentage + +Memory usage as a percentage of the total node allocatable memory + + +[float] +=== `kubernetes.pod.memory.usage.limit.pct` + +type: scaled_float + +format: percentage + +Memory usage as a percentage of the defined limit for the pod containers (or total node allocatable memory if unlimited) + + [float] == container fields diff --git a/metricbeat/module/kubernetes/_meta/test/stats_summary.json b/metricbeat/module/kubernetes/_meta/test/stats_summary.json index 17c72c648fa..dfb105133fd 100644 --- a/metricbeat/module/kubernetes/_meta/test/stats_summary.json +++ b/metricbeat/module/kubernetes/_meta/test/stats_summary.json @@ -76,7 +76,7 @@ "startTime": "2017-04-18T16:47:44Z", "cpu": { "time": "2017-04-20T08:06:34Z", - "usageNanoCores": 0, + "usageNanoCores": 11263994, "usageCoreNanoSeconds": 43959424 }, "memory": { diff --git a/metricbeat/module/kubernetes/container/_meta/fields.yml b/metricbeat/module/kubernetes/container/_meta/fields.yml index 1b497211e38..a0795098dc2 100644 --- a/metricbeat/module/kubernetes/container/_meta/fields.yml +++ b/metricbeat/module/kubernetes/container/_meta/fields.yml @@ -27,6 +27,16 @@ type: long description: > CPU used nanocores + - name: node.pct + type: scaled_float + format: percentage + description: > + CPU usage as a percentage of the total node allocatable CPU + - name: limit.pct + type: scaled_float + format: percentage + description: > + CPU usage as a percentage of the defined limit for the container (or total node allocatable CPU if unlimited) - name: logs type: group description: > @@ -90,6 +100,16 @@ format: bytes description: > Total memory usage + - name: node.pct + type: scaled_float + format: percentage + description: > + Memory usage as a percentage of the total node allocatable memory + - name: limit.pct + type: scaled_float + format: percentage + description: > + Memory usage as a percentage of the defined limit for the container (or total node allocatable memory if unlimited) - name: rss type: group fields: diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 655cae09f8f..97fa1adc013 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -5,6 +5,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) const ( @@ -55,7 +56,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { return nil, err } - events, err := eventMapping(body) + events, err := eventMapping(body, util.PerfMetrics) if err != nil { return nil, err } diff --git a/metricbeat/module/kubernetes/container/container_test.go b/metricbeat/module/kubernetes/container/container_test.go index a200f8b331e..025256bbfc1 100644 --- a/metricbeat/module/kubernetes/container/container_test.go +++ b/metricbeat/module/kubernetes/container/container_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) const testFile = "../_meta/test/stats_summary.json" @@ -21,14 +22,19 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - events, err := eventMapping(body) + cache := util.NewPerfMetricsCache() + cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) + cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) + cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) + + events, err := eventMapping(body, cache) assert.NoError(t, err, "error mapping "+testFile) assert.Len(t, events, 1, "got wrong number of events") testCases := map[string]interface{}{ "cpu.usage.core.ns": 43959424, - "cpu.usage.nanocores": 0, + "cpu.usage.nanocores": 11263994, "logs.available.bytes": 98727014400, "logs.capacity.bytes": 101258067968, @@ -44,6 +50,12 @@ func TestEventMapping(t *testing.T) { "memory.pagefaults": 841, "memory.majorpagefaults": 0, + // calculated pct fields: + "cpu.usage.node.pct": 0.005631997, + "cpu.usage.limit.pct": 0.005631997, + "memory.usage.node.pct": 0.01, + "memory.usage.limit.pct": 0.1, + "name": "nginx", "rootfs.available.bytes": 98727014400, diff --git a/metricbeat/module/kubernetes/container/data.go b/metricbeat/module/kubernetes/container/data.go index 375dc6f9f61..d38bca96a5d 100644 --- a/metricbeat/module/kubernetes/container/data.go +++ b/metricbeat/module/kubernetes/container/data.go @@ -7,9 +7,10 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/kubernetes" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) -func eventMapping(content []byte) ([]common.MapStr, error) { +func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache) ([]common.MapStr, error) { events := []common.MapStr{} var summary kubernetes.Summary @@ -19,6 +20,8 @@ func eventMapping(content []byte) ([]common.MapStr, error) { } node := summary.Node + nodeCores := perfMetrics.NodeCoresAllocatable.Get(node.NodeName) + nodeMem := perfMetrics.NodeMemAllocatable.Get(node.NodeName) for _, pod := range summary.Pods { for _, container := range pod.Containers { containerEvent := common.MapStr{ @@ -93,6 +96,27 @@ func eventMapping(content []byte) ([]common.MapStr, error) { }, }, } + + if nodeCores > 0 { + containerEvent.Put("cpu.usage.node.pct", float64(container.CPU.UsageNanoCores)/1e9/nodeCores) + } + + if nodeMem > 0 { + containerEvent.Put("memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem) + } + + cuid := util.ContainerUID(pod.PodRef.Namespace, pod.PodRef.Name, container.Name) + coresLimit := perfMetrics.ContainerCoresLimit.GetWithDefault(cuid, nodeCores) + memLimit := perfMetrics.ContainerMemLimit.GetWithDefault(cuid, nodeMem) + + if coresLimit > 0 { + containerEvent.Put("cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/coresLimit) + } + + if memLimit > 0 { + containerEvent.Put("memory.usage.limit.pct", float64(container.Memory.UsageBytes)/memLimit) + } + events = append(events, containerEvent) } diff --git a/metricbeat/module/kubernetes/pod/_meta/fields.yml b/metricbeat/module/kubernetes/pod/_meta/fields.yml index d5f24bf9d02..fc2b4d62e6e 100644 --- a/metricbeat/module/kubernetes/pod/_meta/fields.yml +++ b/metricbeat/module/kubernetes/pod/_meta/fields.yml @@ -35,3 +35,46 @@ type: long description: > Tx errors + - name: cpu + type: group + description: > + CPU usage metrics + fields: + - name: usage + type: group + fields: + - name: nanocores + type: long + description: > + CPU used nanocores + - name: node.pct + type: scaled_float + format: percentage + description: > + CPU usage as a percentage of the total node CPU + - name: limit.pct + type: scaled_float + format: percentage + description: > + CPU usage as a percentage of the defined limit for the pod containers (or total node CPU if unlimited) + - name: memory + type: group + fields: + - name: usage + type: group + fields: + - name: bytes + type: long + format: bytes + description: > + Total memory usage + - name: node.pct + type: scaled_float + format: percentage + description: > + Memory usage as a percentage of the total node allocatable memory + - name: limit.pct + type: scaled_float + format: percentage + description: > + Memory usage as a percentage of the defined limit for the pod containers (or total node allocatable memory if unlimited) diff --git a/metricbeat/module/kubernetes/pod/data.go b/metricbeat/module/kubernetes/pod/data.go index d85a1d936f6..0a8b76831e1 100644 --- a/metricbeat/module/kubernetes/pod/data.go +++ b/metricbeat/module/kubernetes/pod/data.go @@ -7,9 +7,10 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/kubernetes" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) -func eventMapping(content []byte) ([]common.MapStr, error) { +func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache) ([]common.MapStr, error) { events := []common.MapStr{} var summary kubernetes.Summary @@ -19,8 +20,20 @@ func eventMapping(content []byte) ([]common.MapStr, error) { } node := summary.Node - + nodeCores := perfMetrics.NodeCoresAllocatable.Get(node.NodeName) + nodeMem := perfMetrics.NodeMemAllocatable.Get(node.NodeName) for _, pod := range summary.Pods { + var usageNanoCores, usageMem int64 + var coresLimit, memLimit float64 + + for _, cont := range pod.Containers { + cuid := util.ContainerUID(pod.PodRef.Namespace, pod.PodRef.Name, cont.Name) + usageNanoCores += cont.CPU.UsageNanoCores + usageMem += cont.Memory.UsageBytes + coresLimit += perfMetrics.ContainerCoresLimit.GetWithDefault(cuid, nodeCores) + memLimit += perfMetrics.ContainerMemLimit.GetWithDefault(cuid, nodeMem) + } + podEvent := common.MapStr{ mb.ModuleDataKey: common.MapStr{ "namespace": pod.PodRef.Namespace, @@ -31,6 +44,18 @@ func eventMapping(content []byte) ([]common.MapStr, error) { "name": pod.PodRef.Name, "start_time": pod.StartTime, + "cpu": common.MapStr{ + "usage": common.MapStr{ + "nanocores": usageNanoCores, + }, + }, + + "memory": common.MapStr{ + "usage": common.MapStr{ + "bytes": usageMem, + }, + }, + "network": common.MapStr{ "rx": common.MapStr{ "bytes": pod.Network.RxBytes, @@ -42,6 +67,31 @@ func eventMapping(content []byte) ([]common.MapStr, error) { }, }, } + + if coresLimit > nodeCores { + coresLimit = nodeCores + } + + if memLimit > nodeMem { + memLimit = nodeMem + } + + if nodeCores > 0 { + podEvent.Put("cpu.usage.node.pct", float64(usageNanoCores)/1e9/nodeCores) + } + + if nodeMem > 0 { + podEvent.Put("memory.usage.node.pct", float64(usageMem)/nodeMem) + } + + if coresLimit > 0 { + podEvent.Put("cpu.usage.limit.pct", float64(usageNanoCores)/1e9/coresLimit) + } + + if memLimit > 0 { + podEvent.Put("memory.usage.limit.pct", float64(usageMem)/memLimit) + } + events = append(events, podEvent) } return events, nil diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index ebab685ff3e..74a8a81a26a 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -5,6 +5,7 @@ import ( "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) const ( @@ -55,7 +56,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { return nil, err } - events, err := eventMapping(body) + events, err := eventMapping(body, util.PerfMetrics) if err != nil { return nil, err } diff --git a/metricbeat/module/kubernetes/pod/pod_test.go b/metricbeat/module/kubernetes/pod/pod_test.go index d0ed5ffb4c2..e527c64a1de 100644 --- a/metricbeat/module/kubernetes/pod/pod_test.go +++ b/metricbeat/module/kubernetes/pod/pod_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/module/kubernetes/util" ) const testFile = "../_meta/test/stats_summary.json" @@ -21,7 +22,12 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - events, err := eventMapping(body) + cache := util.NewPerfMetricsCache() + cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) + cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) + cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) + + events, err := eventMapping(body, cache) assert.NoError(t, err, "error mapping "+testFile) assert.Len(t, events, 1, "got wrong number of events") @@ -33,6 +39,15 @@ func TestEventMapping(t *testing.T) { "network.rx.errors": 0, "network.tx.bytes": 72447, "network.tx.errors": 0, + + // calculated pct fields: + "cpu.usage.nanocores": 11263994, + "cpu.usage.node.pct": 0.005631997, + "cpu.usage.limit.pct": 0.005631997, + + "memory.usage.bytes": 1462272, + "memory.usage.node.pct": 0.01, + "memory.usage.limit.pct": 0.1, } for k, v := range testCases { diff --git a/metricbeat/module/kubernetes/state_container/data.go b/metricbeat/module/kubernetes/state_container/data.go index efc22b2847a..3816de25689 100644 --- a/metricbeat/module/kubernetes/state_container/data.go +++ b/metricbeat/module/kubernetes/state_container/data.go @@ -39,6 +39,8 @@ func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { case "kube_pod_container_resource_limits_cpu_cores": event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) event.Put("cpu.limit.nanocores", metric.GetGauge().GetValue()*nanocores) + cuid := util.ContainerUID(util.GetLabel(metric, "namespace"), util.GetLabel(metric, "pod"), util.GetLabel(metric, "container")) + util.PerfMetrics.ContainerCoresLimit.Set(cuid, metric.GetGauge().GetValue()) case "kube_pod_container_resource_requests_cpu_cores": event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) @@ -47,6 +49,8 @@ func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { case "kube_pod_container_resource_limits_memory_bytes": event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) event.Put("memory.limit.bytes", metric.GetGauge().GetValue()) + cuid := util.ContainerUID(util.GetLabel(metric, "namespace"), util.GetLabel(metric, "pod"), util.GetLabel(metric, "container")) + util.PerfMetrics.ContainerMemLimit.Set(cuid, metric.GetGauge().GetValue()) case "kube_pod_container_resource_requests_memory_bytes": event.Put(mb.ModuleDataKey+".node.name", util.GetLabel(metric, "node")) diff --git a/metricbeat/module/kubernetes/state_node/data.go b/metricbeat/module/kubernetes/state_node/data.go index 138a5625f24..ce336e299a7 100644 --- a/metricbeat/module/kubernetes/state_node/data.go +++ b/metricbeat/module/kubernetes/state_node/data.go @@ -28,12 +28,14 @@ func eventMapping(families []*dto.MetricFamily) ([]common.MapStr, error) { case "kube_node_status_allocatable_cpu_cores": event.Put("cpu.allocatable.cores", metric.GetGauge().GetValue()) + util.PerfMetrics.NodeCoresAllocatable.Set(util.GetLabel(metric, "node"), metric.GetGauge().GetValue()) case "kube_node_status_capacity_cpu_cores": event.Put("cpu.capacity.cores", metric.GetGauge().GetValue()) case "kube_node_status_allocatable_memory_bytes": event.Put("memory.allocatable.bytes", metric.GetGauge().GetValue()) + util.PerfMetrics.NodeMemAllocatable.Set(util.GetLabel(metric, "node"), metric.GetGauge().GetValue()) case "kube_node_status_capacity_memory_bytes": event.Put("memory.capacity.bytes", metric.GetGauge().GetValue()) diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go new file mode 100644 index 00000000000..29bcfaff4f8 --- /dev/null +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -0,0 +1,102 @@ +package util + +import ( + "sync" + "time" +) + +// PerfMetrics stores known metrics from Kubernetes nodes and containers +var PerfMetrics = NewPerfMetricsCache() + +const defaultTimeout = 120 * time.Second + +// NewPerfMetricsCache initializes and returns a new PerfMetricsCache +func NewPerfMetricsCache() *PerfMetricsCache { + return &PerfMetricsCache{ + NodeMemAllocatable: newValueMap(defaultTimeout), + NodeCoresAllocatable: newValueMap(defaultTimeout), + + ContainerMemLimit: newValueMap(defaultTimeout), + ContainerCoresLimit: newValueMap(defaultTimeout), + } +} + +// PerfMetricsCache stores known metrics from Kubernetes nodes and containers +type PerfMetricsCache struct { + mutex sync.RWMutex + NodeMemAllocatable *valueMap + NodeCoresAllocatable *valueMap + + ContainerMemLimit *valueMap + ContainerCoresLimit *valueMap +} + +func newValueMap(timeout time.Duration) *valueMap { + return &valueMap{ + values: map[string]value{}, + timeout: timeout, + } +} + +type valueMap struct { + sync.RWMutex + running bool + timeout time.Duration + values map[string]value +} + +type value struct { + value float64 + expires int +} + +// ContainerUID creates an unique ID for from namespace, pod name and container name +func ContainerUID(namespace, pod, container string) string { + return namespace + "-" + pod + "-" + container +} + +// Get value +func (m *valueMap) Get(name string) float64 { + m.RLock() + defer m.RUnlock() + return m.values[name].value +} + +// Get value +func (m *valueMap) GetWithDefault(name string, def float64) float64 { + m.RLock() + defer m.RUnlock() + val, ok := m.values[name] + if ok { + return val.value + } + return def +} + +// Set value +func (m *valueMap) Set(name string, val float64) { + m.Lock() + defer m.Unlock() + m.ensureCleanupWorker() + m.values[name] = value{val, time.Now().Add(m.timeout).Nanosecond()} +} + +func (m *valueMap) ensureCleanupWorker() { + if !m.running { + // Run worker to cleanup expired entries + m.running = true + go func() { + for { + time.Sleep(m.timeout) + now := time.Now().Nanosecond() + m.Lock() + for name, val := range m.values { + if now > val.expires { + delete(m.values, name) + } + } + m.Unlock() + } + }() + } +} diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go new file mode 100644 index 00000000000..8c2def169cf --- /dev/null +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -0,0 +1,46 @@ +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestValueMap(t *testing.T) { + test := newValueMap(defaultTimeout) + + // no value + assert.Equal(t, 0.0, test.Get("foo")) + + // Set and test + test.Set("foo", 3.14) + assert.Equal(t, 3.14, test.Get("foo")) +} + +func TestGetWithDefault(t *testing.T) { + test := newValueMap(defaultTimeout) + + // Empty + default + assert.Equal(t, 0.0, test.Get("foo")) + assert.Equal(t, 3.14, test.GetWithDefault("foo", 3.14)) + + // Defined value + test.Set("foo", 38.2) + assert.Equal(t, 38.2, test.GetWithDefault("foo", 3.14)) +} + +func TestTimeout(t *testing.T) { + test := newValueMap(20 * time.Millisecond) + + test.Set("foo", 3.14) + assert.Equal(t, 3.14, test.Get("foo")) + + // expired: + time.Sleep(60 * time.Millisecond) + assert.Equal(t, 0.0, test.Get("foo")) +} + +func TestContainerUID(t *testing.T) { + assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c")) +}