diff --git a/Gopkg.lock b/Gopkg.lock index 5a4f12706ae65..ac3bb81655054 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -403,6 +403,7 @@ "apis/apps/v1beta1", "apis/apps/v1beta2", "apis/core/v1", + "apis/extensions/v1beta1", "apis/meta/v1", "apis/policy/v1beta1", "apis/resource", @@ -1616,6 +1617,7 @@ "github.com/ericchiang/k8s/apis/apps/v1beta1", "github.com/ericchiang/k8s/apis/apps/v1beta2", "github.com/ericchiang/k8s/apis/core/v1", + "github.com/ericchiang/k8s/apis/extensions/v1beta1", "github.com/ericchiang/k8s/apis/meta/v1", "github.com/ericchiang/k8s/apis/resource", "github.com/ericchiang/k8s/util/intstr", diff --git a/plugins/inputs/kube_inventory/client.go b/plugins/inputs/kube_inventory/client.go index bf207b0ad46d6..5bb2baf5ce412 100644 --- a/plugins/inputs/kube_inventory/client.go +++ b/plugins/inputs/kube_inventory/client.go @@ -8,6 +8,7 @@ import ( "github.com/ericchiang/k8s/apis/apps/v1beta1" "github.com/ericchiang/k8s/apis/apps/v1beta2" "github.com/ericchiang/k8s/apis/core/v1" + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" "github.com/influxdata/telegraf/internal/tls" ) @@ -61,6 +62,20 @@ func (c *client) getDeployments(ctx context.Context) (*v1beta1.DeploymentList, e return list, c.List(ctx, c.namespace, list) } +func (c *client) getEndpoints(ctx context.Context) (*v1.EndpointsList, error) { + list := new(v1.EndpointsList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getIngress(ctx context.Context) (*v1beta1EXT.IngressList, error) { + list := new(v1beta1EXT.IngressList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + func (c *client) getNodes(ctx context.Context) (*v1.NodeList, error) { list := new(v1.NodeList) ctx, cancel := context.WithTimeout(ctx, c.timeout) @@ -89,6 +104,13 @@ func (c *client) getPods(ctx context.Context) (*v1.PodList, error) { return list, c.List(ctx, c.namespace, list) } +func (c *client) getServices(ctx context.Context) (*v1.ServiceList, error) { + list := new(v1.ServiceList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + func (c *client) getStatefulSets(ctx context.Context) (*v1beta1.StatefulSetList, error) { list := new(v1beta1.StatefulSetList) ctx, cancel := context.WithTimeout(ctx, c.timeout) diff --git a/plugins/inputs/kube_inventory/client_test.go b/plugins/inputs/kube_inventory/client_test.go index 4f54755b02362..3e4eaf7522249 100644 --- a/plugins/inputs/kube_inventory/client_test.go +++ b/plugins/inputs/kube_inventory/client_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/ericchiang/k8s/util/intstr" "github.com/influxdata/telegraf/internal/tls" ) @@ -27,6 +28,13 @@ func toBoolPtr(b bool) *bool { return &b } +func toIntStrPtrS(s string) *intstr.IntOrString { + return &intstr.IntOrString{StrVal: &s} +} + +func toIntStrPtrI(i int32) *intstr.IntOrString { + return &intstr.IntOrString{IntVal: &i} +} func TestNewClient(t *testing.T) { _, err := newClient("https://127.0.0.1:443/", "default", "abc123", time.Second, tls.ClientConfig{}) if err != nil { diff --git a/plugins/inputs/kube_inventory/endpoint.go b/plugins/inputs/kube_inventory/endpoint.go new file mode 100644 index 0000000000000..7298789da8e08 --- /dev/null +++ b/plugins/inputs/kube_inventory/endpoint.go @@ -0,0 +1,82 @@ +package kube_inventory + +import ( + "context" + "strings" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectEndpoints(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getEndpoints(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherEndpoint(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherEndpoint(e v1.Endpoints, acc telegraf.Accumulator) error { + if e.Metadata.CreationTimestamp.GetSeconds() == 0 && e.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(e.Metadata.CreationTimestamp.GetSeconds(), int64(e.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": e.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "endpoint_name": e.Metadata.GetName(), + "namespace": e.Metadata.GetNamespace(), + } + + for _, endpoint := range e.GetSubsets() { + for _, readyAddr := range endpoint.GetAddresses() { + fields["ready"] = true + + tags["hostname"] = readyAddr.GetHostname() + tags["node_name"] = readyAddr.GetNodeName() + if readyAddr.TargetRef != nil { + tags[strings.ToLower(readyAddr.GetTargetRef().GetKind())] = readyAddr.GetTargetRef().GetName() + } + + for _, port := range endpoint.GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + for _, notReadyAddr := range endpoint.GetNotReadyAddresses() { + fields["ready"] = false + + tags["hostname"] = notReadyAddr.GetHostname() + tags["node_name"] = notReadyAddr.GetNodeName() + if notReadyAddr.TargetRef != nil { + tags[strings.ToLower(notReadyAddr.GetTargetRef().GetKind())] = notReadyAddr.GetTargetRef().GetName() + } + + for _, port := range endpoint.GetPorts() { + fields["port"] = port.GetPort() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + acc.AddFields(endpointMeasurement, fields, tags) + } + } + } + + return nil +} diff --git a/plugins/inputs/kube_inventory/endpoint_test.go b/plugins/inputs/kube_inventory/endpoint_test.go new file mode 100644 index 0000000000000..b88c388162bd2 --- /dev/null +++ b/plugins/inputs/kube_inventory/endpoint_test.go @@ -0,0 +1,194 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestEndpoint(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{}, + }, + }, + hasError: false, + }, + { + name: "collect ready endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{ + Items: []*v1.Endpoints{ + { + Subsets: []*v1.EndpointSubset{ + { + Addresses: []*v1.EndpointAddress{ + { + Hostname: toStrPtr("storage-6"), + NodeName: toStrPtr("b.storage.internal"), + TargetRef: &v1.ObjectReference{ + Kind: toStrPtr("pod"), + Name: toStrPtr("storage-6"), + }, + }, + }, + Ports: []*v1.EndpointPort{ + { + Name: toStrPtr("server"), + Protocol: toStrPtr("TCP"), + Port: toInt32Ptr(8080), + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("storage"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "ready": true, + "port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "endpoint_name": "storage", + "namespace": "ns1", + "hostname": "storage-6", + "node_name": "b.storage.internal", + "port_name": "server", + "port_protocol": "TCP", + "pod": "storage-6", + }, + }, + }, + }, + hasError: false, + }, + { + name: "collect notready endpoints", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/endpoints/": &v1.EndpointsList{ + Items: []*v1.Endpoints{ + { + Subsets: []*v1.EndpointSubset{ + { + NotReadyAddresses: []*v1.EndpointAddress{ + { + Hostname: toStrPtr("storage-6"), + NodeName: toStrPtr("b.storage.internal"), + TargetRef: &v1.ObjectReference{ + Kind: toStrPtr("pod"), + Name: toStrPtr("storage-6"), + }, + }, + }, + Ports: []*v1.EndpointPort{ + { + Name: toStrPtr("server"), + Protocol: toStrPtr("TCP"), + Port: toInt32Ptr(8080), + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("storage"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "ready": false, + "port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "endpoint_name": "storage", + "namespace": "ns1", + "hostname": "storage-6", + "node_name": "b.storage.internal", + "port_name": "server", + "port_protocol": "TCP", + "pod": "storage-6", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, endpoint := range ((v.handler.responseMap["/endpoints/"]).(*v1.EndpointsList)).Items { + err := ks.gatherEndpoint(*endpoint, acc) + if err != nil { + t.Errorf("Failed to gather endpoint - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/ingress.go b/plugins/inputs/kube_inventory/ingress.go new file mode 100644 index 0000000000000..6d5c8019927cf --- /dev/null +++ b/plugins/inputs/kube_inventory/ingress.go @@ -0,0 +1,60 @@ +package kube_inventory + +import ( + "context" + "time" + + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" + + "github.com/influxdata/telegraf" +) + +func collectIngress(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getIngress(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherIngress(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherIngress(i v1beta1EXT.Ingress, acc telegraf.Accumulator) error { + if i.Metadata.CreationTimestamp.GetSeconds() == 0 && i.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(i.Metadata.CreationTimestamp.GetSeconds(), int64(i.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": i.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "ingress_name": i.Metadata.GetName(), + "namespace": i.Metadata.GetNamespace(), + } + + for _, ingress := range i.GetStatus().GetLoadBalancer().GetIngress() { + tags["hostname"] = ingress.GetHostname() + tags["ip"] = ingress.GetIp() + + for _, rule := range i.GetSpec().GetRules() { + for _, path := range rule.GetIngressRuleValue().GetHttp().GetPaths() { + fields["backend_service_port"] = path.GetBackend().GetServicePort().GetIntVal() + fields["tls"] = i.GetSpec().GetTls() != nil + + tags["backend_service_name"] = path.GetBackend().GetServiceName() + tags["path"] = path.GetPath() + tags["host"] = rule.GetHost() + + acc.AddFields(ingressMeasurement, fields, tags) + } + } + } + + return nil +} diff --git a/plugins/inputs/kube_inventory/ingress_test.go b/plugins/inputs/kube_inventory/ingress_test.go new file mode 100644 index 0000000000000..e3b44512cc11f --- /dev/null +++ b/plugins/inputs/kube_inventory/ingress_test.go @@ -0,0 +1,142 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + v1beta1EXT "github.com/ericchiang/k8s/apis/extensions/v1beta1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestIngress(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no ingress", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/ingress/": &v1beta1EXT.IngressList{}, + }, + }, + hasError: false, + }, + { + name: "collect ingress", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/ingress/": &v1beta1EXT.IngressList{ + Items: []*v1beta1EXT.Ingress{ + { + Status: &v1beta1EXT.IngressStatus{ + LoadBalancer: &v1.LoadBalancerStatus{ + Ingress: []*v1.LoadBalancerIngress{ + { + Hostname: toStrPtr("chron-1"), + Ip: toStrPtr("1.0.0.127"), + }, + }, + }, + }, + Spec: &v1beta1EXT.IngressSpec{ + Rules: []*v1beta1EXT.IngressRule{ + { + Host: toStrPtr("ui.internal"), + IngressRuleValue: &v1beta1EXT.IngressRuleValue{ + Http: &v1beta1EXT.HTTPIngressRuleValue{ + Paths: []*v1beta1EXT.HTTPIngressPath{ + { + Path: toStrPtr("/"), + Backend: &v1beta1EXT.IngressBackend{ + ServiceName: toStrPtr("chronografd"), + ServicePort: toIntStrPtrI(8080), + }, + }, + }, + }, + }, + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("ui-lb"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "tls": false, + "backend_service_port": int32(8080), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "ingress_name": "ui-lb", + "namespace": "ns1", + "ip": "1.0.0.127", + "hostname": "chron-1", + "backend_service_name": "chronografd", + "host": "ui.internal", + "path": "/", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, ingress := range ((v.handler.responseMap["/ingress/"]).(*v1beta1EXT.IngressList)).Items { + err := ks.gatherIngress(*ingress, acc) + if err != nil { + t.Errorf("Failed to gather ingress - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go index 57d31908d2bf1..9ffe0765e71e3 100644 --- a/plugins/inputs/kube_inventory/kube_state.go +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -111,10 +111,13 @@ func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) { var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){ "daemonsets": collectDaemonSets, "deployments": collectDeployments, + "endpoints": collectEndpoints, + "ingress": collectIngress, "nodes": collectNodes, "persistentvolumes": collectPersistentVolumes, "persistentvolumeclaims": collectPersistentVolumeClaims, "pods": collectPods, + "services": collectServices, "statefulsets": collectStatefulSets, } @@ -158,10 +161,13 @@ func convertQuantity(s string, m float64) int64 { var ( daemonSetMeasurement = "kubernetes_daemonset" deploymentMeasurement = "kubernetes_deployment" + endpointMeasurement = "kubernetes_endpoint" + ingressMeasurement = "kubernetes_ingress" nodeMeasurement = "kubernetes_node" persistentVolumeMeasurement = "kubernetes_persistentvolume" persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim" podContainerMeasurement = "kubernetes_pod_container" + serviceMeasurement = "kubernetes_service" statefulSetMeasurement = "kubernetes_statefulset" ) diff --git a/plugins/inputs/kube_inventory/service.go b/plugins/inputs/kube_inventory/service.go new file mode 100644 index 0000000000000..4b0cc08452e23 --- /dev/null +++ b/plugins/inputs/kube_inventory/service.go @@ -0,0 +1,70 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectServices(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getServices(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, i := range list.Items { + if err = ki.gatherService(*i, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherService(s v1.Service, acc telegraf.Accumulator) error { + if s.Metadata.CreationTimestamp.GetSeconds() == 0 && s.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + fields := map[string]interface{}{ + "created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": s.Metadata.GetGeneration(), + } + + tags := map[string]string{ + "service_name": s.Metadata.GetName(), + "namespace": s.Metadata.GetNamespace(), + } + + var getPorts = func() { + for _, port := range s.GetSpec().GetPorts() { + fields["port"] = port.GetPort() + fields["target_port"] = port.GetTargetPort().GetIntVal() + + tags["port_name"] = port.GetName() + tags["port_protocol"] = port.GetProtocol() + + if s.GetSpec().GetType() == "ExternalName" { + tags["external_name"] = s.GetSpec().GetExternalName() + } else { + tags["cluster_ip"] = s.GetSpec().GetClusterIP() + } + + acc.AddFields(serviceMeasurement, fields, tags) + } + } + + if externIPs := s.GetSpec().GetExternalIPs(); externIPs != nil { + for _, ip := range externIPs { + tags["ip"] = ip + + getPorts() + } + } else { + getPorts() + } + + return nil +} diff --git a/plugins/inputs/kube_inventory/service_test.go b/plugins/inputs/kube_inventory/service_test.go new file mode 100644 index 0000000000000..6c0c8787adb8e --- /dev/null +++ b/plugins/inputs/kube_inventory/service_test.go @@ -0,0 +1,123 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/influxdata/telegraf/testutil" +) + +func TestService(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no service", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/service/": &v1.ServiceList{}, + }, + }, + hasError: false, + }, + { + name: "collect service", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/service/": &v1.ServiceList{ + Items: []*v1.Service{ + { + Spec: &v1.ServiceSpec{ + Ports: []*v1.ServicePort{ + { + Port: toInt32Ptr(8080), + TargetPort: toIntStrPtrI(1234), + Name: toStrPtr("diagnostic"), + Protocol: toStrPtr("TCP"), + }, + }, + ExternalIPs: []string{"1.0.0.127"}, + ClusterIP: toStrPtr("127.0.0.1"), + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(12), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("checker"), + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "port": int32(8080), + "target_port": int32(1234), + "generation": int64(12), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "service_name": "checker", + "namespace": "ns1", + "port_name": "diagnostic", + "port_protocol": "TCP", + "cluster_ip": "127.0.0.1", + "ip": "1.0.0.127", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, service := range ((v.handler.responseMap["/service/"]).(*v1.ServiceList)).Items { + err := ks.gatherService(*service, acc) + if err != nil { + t.Errorf("Failed to gather service - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +}