diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 41d77146e0a..8ccf59259bb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -131,6 +131,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Do not report Metricbeat container host as hostname in Kubernetes deployment. {issue}7199[7199] - Ensure metadata updates don't replace existing pod metrics. {pull}7573[7573] - Fix kubernetes pct fields reporting. {pull}7677[7677] +- Add support for new `kube_node_status_condition` in Kubernetes `state_node`. {pull}7699[7699] *Packetbeat* diff --git a/metricbeat/helper/prometheus/metric.go b/metricbeat/helper/prometheus/metric.go index 26b13fbaf2d..716ea1a34af 100644 --- a/metricbeat/helper/prometheus/metric.go +++ b/metricbeat/helper/prometheus/metric.go @@ -29,6 +29,9 @@ import ( // MetricMap defines the mapping from Prometheus metric to a Metricbeat field type MetricMap interface { + // GetOptions returns the list of metric options + GetOptions() []MetricOption + // GetField returns the resulting field name GetField() string @@ -36,53 +39,84 @@ type MetricMap interface { GetValue(m *dto.Metric) interface{} } +// MetricOption adds settings to Metric objects behavior +type MetricOption interface { + // Process a tuple of field, value and labels from a metric, return the same tuple updated + Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) +} + +// OpFilter only processes metrics matching the given filter +func OpFilter(filter map[string]string) MetricOption { + return opFilter{ + labels: filter, + } +} + +// OpLowercaseValue lowercases the value if it's a string +func OpLowercaseValue() MetricOption { + return opLowercaseValue{} +} + // Metric directly maps a Prometheus metric to a Metricbeat field -func Metric(field string) MetricMap { +func Metric(field string, options ...MetricOption) MetricMap { return &commonMetric{ - field: field, + field: field, + options: options, } } // KeywordMetric maps a Prometheus metric to a Metricbeat field, stores the // given keyword when source metric value is 1 -func KeywordMetric(field, keyword string) MetricMap { +func KeywordMetric(field, keyword string, options ...MetricOption) MetricMap { return &keywordMetric{ commonMetric{ - field: field, + field: field, + options: options, }, keyword, } } // BooleanMetric maps a Prometheus metric to a Metricbeat field of bool type -func BooleanMetric(field string) MetricMap { +func BooleanMetric(field string, options ...MetricOption) MetricMap { return &booleanMetric{ commonMetric{ - field: field, + field: field, + options: options, }, } } // LabelMetric maps a Prometheus metric to a Metricbeat field, stores the value // of a given label on it if the gauge value is 1 -func LabelMetric(field, label string, lowercase bool) MetricMap { +func LabelMetric(field, label string, options ...MetricOption) MetricMap { return &labelMetric{ commonMetric{ - field: field, + field: field, + options: options, }, label, - lowercase, } } // InfoMetric obtains info labels from the given metric and puts them // into events matching all the key labels present in the metric -func InfoMetric() MetricMap { - return &infoMetric{} +func InfoMetric(options ...MetricOption) MetricMap { + return &infoMetric{ + commonMetric{ + options: options, + }, + } } type commonMetric struct { - field string + field string + options []MetricOption +} + +// GetOptions returns the list of metric options +func (m *commonMetric) GetOptions() []MetricOption { + return m.options } // GetField returns the resulting field name @@ -176,18 +210,13 @@ func (m *booleanMetric) GetValue(metric *dto.Metric) interface{} { type labelMetric struct { commonMetric - label string - lowercase bool + label string } // GetValue returns the resulting value func (m *labelMetric) GetValue(metric *dto.Metric) interface{} { if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { - value := getLabel(metric, m.label) - if m.lowercase { - return strings.ToLower(value) - } - return value + return getLabel(metric, m.label) } return nil } @@ -201,7 +230,9 @@ func getLabel(metric *dto.Metric, name string) string { return "" } -type infoMetric struct{} +type infoMetric struct { + commonMetric +} // GetValue returns the resulting value func (m *infoMetric) GetValue(metric *dto.Metric) interface{} { @@ -212,3 +243,27 @@ func (m *infoMetric) GetValue(metric *dto.Metric) interface{} { func (m *infoMetric) GetField() string { return "" } + +type opFilter struct { + labels map[string]string +} + +// Process will return nil if labels don't match the filter +func (o opFilter) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + for k, v := range o.labels { + if labels[k] != v { + return "", nil, nil + } + } + return field, value, labels +} + +type opLowercaseValue struct{} + +// Process will lowercase the given value if it's a string +func (o opLowercaseValue) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + if val, ok := value.(string); ok { + value = strings.ToLower(val) + } + return field, value, labels +} diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index f2664d9e854..27456b09f4b 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -128,10 +128,16 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS continue } + // Apply extra options + allLabels := getLabels(metric) + for _, option := range m.GetOptions() { + field, value, allLabels = option.Process(field, value, allLabels) + } + // Convert labels labels := common.MapStr{} keyLabels := common.MapStr{} - for k, v := range getLabels(metric) { + for k, v := range allLabels { if l, ok := mapping.Labels[k]; ok { if l.IsKey() { keyLabels.Put(l.GetField(), v) @@ -151,10 +157,12 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS continue } - // Put it in the event if it's a common metric - event := getEvent(eventsMap, keyLabels) - event.Put(field, value) - event.DeepUpdate(labels) + if field != "" { + // Put it in the event if it's a common metric + event := getEvent(eventsMap, keyLabels) + event.Put(field, value) + event.DeepUpdate(labels) + } } } diff --git a/metricbeat/helper/prometheus/prometheus_test.go b/metricbeat/helper/prometheus/prometheus_test.go index 0b2547b01f5..2a2e8c09568 100644 --- a/metricbeat/helper/prometheus/prometheus_test.go +++ b/metricbeat/helper/prometheus/prometheus_test.go @@ -227,7 +227,7 @@ func TestPrometheus(t *testing.T) { msg: "Label metrics", mapping: &MetricsMapping{ Metrics: map[string]MetricMap{ - "first_metric": LabelMetric("first.metric", "label3", false), + "first_metric": LabelMetric("first.metric", "label3"), }, Labels: map[string]LabelMap{ "label1": Label("labels.label1"), @@ -248,7 +248,7 @@ func TestPrometheus(t *testing.T) { msg: "Label metrics, lowercase", mapping: &MetricsMapping{ Metrics: map[string]MetricMap{ - "first_metric": LabelMetric("first.metric", "label4", true), + "first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue()), }, Labels: map[string]LabelMap{ "label1": Label("labels.label1"), @@ -265,6 +265,20 @@ func TestPrometheus(t *testing.T) { }, }, }, + { + msg: "Label metrics, filter", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue(), OpFilter(map[string]string{ + "foo": "filtered", + })), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{}, + }, { msg: "Summary metric", mapping: &MetricsMapping{ @@ -318,16 +332,19 @@ func TestPrometheus(t *testing.T) { } for _, test := range tests { - reporter := &mbtest.CapturingReporterV2{} - p.ReportProcessedMetrics(test.mapping, reporter) - assert.Nil(t, reporter.GetErrors(), test.msg) - // Sort slice to avoid randomness - res := reporter.GetEvents() - sort.Slice(res, func(i, j int) bool { - return res[i].MetricSetFields.String() < res[j].MetricSetFields.String() + t.Run(test.msg, func(t *testing.T) { + reporter := &mbtest.CapturingReporterV2{} + p.ReportProcessedMetrics(test.mapping, reporter) + assert.Nil(t, reporter.GetErrors(), test.msg) + // Sort slice to avoid randomness + res := reporter.GetEvents() + sort.Slice(res, func(i, j int) bool { + return res[i].MetricSetFields.String() < res[j].MetricSetFields.String() + }) + assert.Equal(t, len(test.expected), len(res)) + for j, ev := range res { + assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg) + } }) - for j, ev := range res { - assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg) - } } } diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index 59c490b2502..2343319d570 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -53,8 +53,8 @@ var ( "kube_pod_container_status_running": p.KeywordMetric("status.phase", "running"), "kube_pod_container_status_terminated": p.KeywordMetric("status.phase", "terminated"), "kube_pod_container_status_waiting": p.KeywordMetric("status.phase", "waiting"), - "kube_pod_container_status_terminated_reason": p.LabelMetric("status.reason", "reason", false), - "kube_pod_container_status_waiting_reason": p.LabelMetric("status.reason", "reason", false), + "kube_pod_container_status_terminated_reason": p.LabelMetric("status.reason", "reason"), + "kube_pod_container_status_waiting_reason": p.LabelMetric("status.reason", "reason"), }, Labels: map[string]p.LabelMap{ diff --git a/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.expected b/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.expected new file mode 100644 index 00000000000..90b0abf2dd0 --- /dev/null +++ b/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.expected @@ -0,0 +1,66 @@ +[ + { + "_namespace": "node", + "cpu": { + "allocatable": { + "cores": 3 + }, + "capacity": { + "cores": 4 + } + }, + "memory": { + "allocatable": { + "bytes": 3097786880 + }, + "capacity": { + "bytes": 4097786880 + } + }, + "name": "minikube-test", + "pod": { + "allocatable": { + "total": 210 + }, + "capacity": { + "total": 310 + } + }, + "status": { + "ready": "true", + "unschedulable": true + } + }, + { + "_namespace": "node", + "cpu": { + "allocatable": { + "cores": 2 + }, + "capacity": { + "cores": 2 + } + }, + "memory": { + "allocatable": { + "bytes": 2097786880 + }, + "capacity": { + "bytes": 2097786880 + } + }, + "name": "minikube", + "pod": { + "allocatable": { + "total": 110 + }, + "capacity": { + "total": 110 + } + }, + "status": { + "ready": "true", + "unschedulable": false + } + } +] \ No newline at end of file diff --git a/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.v1.3.0.expected b/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.v1.3.0.expected new file mode 100644 index 00000000000..2d13051b048 --- /dev/null +++ b/metricbeat/module/kubernetes/state_node/_meta/test/kube-state-metrics.v1.3.0.expected @@ -0,0 +1,34 @@ +[ + { + "_namespace": "node", + "cpu": { + "allocatable": { + "cores": 2 + }, + "capacity": { + "cores": 2 + } + }, + "memory": { + "allocatable": { + "bytes": 1992347648 + }, + "capacity": { + "bytes": 2097205248 + } + }, + "name": "minikube", + "pod": { + "allocatable": { + "total": 110 + }, + "capacity": { + "total": 110 + } + }, + "status": { + "ready": "true", + "unschedulable": false + } + } +] \ No newline at end of file diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index b77cc7207ed..ab9e2b2e386 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -47,7 +47,11 @@ var ( "kube_node_status_capacity_cpu_cores": p.Metric("cpu.capacity.cores"), "kube_node_status_allocatable_cpu_cores": p.Metric("cpu.allocatable.cores"), "kube_node_spec_unschedulable": p.BooleanMetric("status.unschedulable"), - "kube_node_status_ready": p.LabelMetric("status.ready", "condition", false), + "kube_node_status_ready": p.LabelMetric("status.ready", "condition"), + "kube_node_status_condition": p.LabelMetric("status.ready", "status", + p.OpFilter(map[string]string{ + "condition": "Ready", + })), }, Labels: map[string]p.LabelMap{ diff --git a/metricbeat/module/kubernetes/state_node/state_node_test.go b/metricbeat/module/kubernetes/state_node/state_node_test.go index 4662e360a25..1696e0f8116 100644 --- a/metricbeat/module/kubernetes/state_node/state_node_test.go +++ b/metricbeat/module/kubernetes/state_node/state_node_test.go @@ -20,108 +20,22 @@ package state_node import ( - "io/ioutil" - "net/http" - "net/http/httptest" - "os" "testing" - "github.com/elastic/beats/libbeat/common" - mbtest "github.com/elastic/beats/metricbeat/mb/testing" - - "github.com/stretchr/testify/assert" + "github.com/elastic/beats/metricbeat/helper/prometheus/ptest" ) -const testFile = "../_meta/test/kube-state-metrics" - func TestEventMapping(t *testing.T) { - file, err := os.Open(testFile) - assert.NoError(t, err, "cannot open test file "+testFile) - - body, err := ioutil.ReadAll(file) - assert.NoError(t, err, "cannot read test file "+testFile) - - server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1") - w.Write([]byte(body)) - })) - - server.Start() - defer server.Close() - - config := map[string]interface{}{ - "module": "kubernetes", - "metricsets": []string{"state_node"}, - "hosts": []string{server.URL}, - } - - f := mbtest.NewEventsFetcher(t, config) - - events, err := f.Fetch() - assert.NoError(t, err) - - assert.Equal(t, 2, len(events), "Wrong number of returned events") - - testCases := testCases() - for _, event := range events { - name, err := event.GetValue("name") - if err == nil { - eventKey := name.(string) - oneTestCase, oneTestCaseFound := testCases[eventKey] - if oneTestCaseFound { - for k, v := range oneTestCase { - testValue(eventKey, t, event, k, v) - } - delete(testCases, eventKey) - } - } - } - - if len(testCases) > 0 { - t.Errorf("Test reference events not found: %v, \n\ngot: %v", testCases, events) - } -} - -func testValue(eventKey string, t *testing.T, event common.MapStr, field string, expected interface{}) { - data, err := event.GetValue(field) - assert.NoError(t, err, eventKey+": Could not read field "+field) - assert.EqualValues(t, expected, data, eventKey+": Wrong value for field "+field) -} - -func testCases() map[string]map[string]interface{} { - return map[string]map[string]interface{}{ - "minikube": { - "_namespace": "node", - "name": "minikube", - - "status.ready": "true", - "status.unschedulable": false, - - "cpu.allocatable.cores": 2, - "cpu.capacity.cores": 2, - - "memory.allocatable.bytes": 2097786880, - "memory.capacity.bytes": 2097786880, - - "pod.allocatable.total": 110, - "pod.capacity.total": 110, - }, - "minikube-test": { - "_namespace": "node", - "name": "minikube-test", - - "status.ready": "true", - "status.unschedulable": true, - - "cpu.allocatable.cores": 3, - "cpu.capacity.cores": 4, - - "memory.allocatable.bytes": 3097786880, - "memory.capacity.bytes": 4097786880, - - "pod.allocatable.total": 210, - "pod.capacity.total": 310, + ptest.TestMetricSetEventsFetcher(t, "kubernetes", "state_node", + ptest.TestCases{ + { + MetricsFile: "../_meta/test/kube-state-metrics", + ExpectedFile: "./_meta/test/kube-state-metrics.expected", + }, + { + MetricsFile: "../_meta/test/kube-state-metrics.v1.3.0", + ExpectedFile: "./_meta/test/kube-state-metrics.v1.3.0.expected", + }, }, - } + ) } diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 6f6236cb8f5..db4dd5782de 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -40,9 +40,9 @@ var ( mapping = &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_pod_info": p.InfoMetric(), - "kube_pod_status_phase": p.LabelMetric("status.phase", "phase", true), - "kube_pod_status_ready": p.LabelMetric("status.ready", "condition", true), - "kube_pod_status_scheduled": p.LabelMetric("status.scheduled", "condition", true), + "kube_pod_status_phase": p.LabelMetric("status.phase", "phase", p.OpLowercaseValue()), + "kube_pod_status_ready": p.LabelMetric("status.ready", "condition", p.OpLowercaseValue()), + "kube_pod_status_scheduled": p.LabelMetric("status.scheduled", "condition", p.OpLowercaseValue()), }, Labels: map[string]p.LabelMap{