Skip to content

Commit

Permalink
Add k8s service discovery resource attributes (#9416)
Browse files Browse the repository at this point in the history
* Add k8s service discovery resource attributes

* address feedback
  • Loading branch information
dashpole authored Apr 29, 2022
1 parent 033bc94 commit 814c3a9
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- `attributesprocessor`: Support filter by severity (#9132)
- `processor/transform`: Add transformation of logs (#9368)
- `datadogexporter`: Add `metrics::summaries::mode` to specify export mode for summaries (#8846)
- `prometheusreceiver`: Add resource attributes for kubernetes resource discovery labels (#9416)

### 🧰 Bug fixes 🧰

Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/otlp_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (t *transactionPdata) initTransaction(labels labels.Labels) error {
t.job = job
t.instance = instance
}
t.nodeResource = CreateNodeAndResourcePdata(job, instance, metadataCache.SharedLabels().Get(model.SchemeLabel))
t.nodeResource = CreateNodeAndResourcePdata(job, instance, metadataCache.SharedLabels())
t.metricBuilder = newMetricBuilderPdata(metadataCache, t.useStartTimeMetric, t.startTimeMetricRegex, t.logger, t.startTimeMs)
t.isNew = false
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func Test_transaction_pdata(t *testing.T) {
if got := tr.Commit(); got != nil {
t.Errorf("expecting nil from Commit() but got err %v", got)
}
expectedNodeResource := CreateNodeAndResourcePdata("test", "localhost:8080", "http")
l := []labels.Label{{Name: "__scheme__", Value: "http"}}
expectedNodeResource := CreateNodeAndResourcePdata("test", "localhost:8080", l)
mds := sink.AllMetrics()
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
Expand Down
48 changes: 46 additions & 2 deletions receiver/prometheusreceiver/internal/prom_to_otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"net"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
)
Expand All @@ -41,7 +43,7 @@ func isDiscernibleHost(host string) bool {
}

// CreateNodeAndResourcePdata creates the resource data added to OTLP payloads.
func CreateNodeAndResourcePdata(job, instance, scheme string) *pcommon.Resource {
func CreateNodeAndResourcePdata(job, instance string, serviceDiscoveryLabels labels.Labels) *pcommon.Resource {
host, port, err := net.SplitHostPort(instance)
if err != nil {
host = instance
Expand All @@ -54,7 +56,49 @@ func CreateNodeAndResourcePdata(job, instance, scheme string) *pcommon.Resource
}
attrs.UpsertString(conventions.AttributeServiceInstanceID, instance)
attrs.UpsertString(conventions.AttributeNetHostPort, port)
attrs.UpsertString(conventions.AttributeHTTPScheme, scheme)
attrs.UpsertString(conventions.AttributeHTTPScheme, serviceDiscoveryLabels.Get(model.SchemeLabel))

addKubernetesResource(attrs, serviceDiscoveryLabels)

return &resource
}

// kubernetesDiscoveryToResourceAttributes maps from metadata labels discovered
// through the kubernetes implementation of service discovery to opentelemetry
// resource attribute keys.
var kubernetesDiscoveryToResourceAttributes = map[string]string{
"__meta_kubernetes_pod_name": conventions.AttributeK8SPodName,
"__meta_kubernetes_pod_uid": conventions.AttributeK8SPodUID,
"__meta_kubernetes_pod_container_name": conventions.AttributeK8SContainerName,
"__meta_kubernetes_namespace": conventions.AttributeK8SNamespaceName,
// Only one of the node name service discovery labels will be present
"__meta_kubernetes_pod_node_name": conventions.AttributeK8SNodeName,
"__meta_kubernetes_node_name": conventions.AttributeK8SNodeName,
"__meta_kubernetes_endpoint_node_name": conventions.AttributeK8SNodeName,
}

// addKubernetesResource adds resource information detected by prometheus'
// kubernetes service discovery.
func addKubernetesResource(attrs pcommon.Map, serviceDiscoveryLabels labels.Labels) {
for sdKey, attributeKey := range kubernetesDiscoveryToResourceAttributes {
if attr := serviceDiscoveryLabels.Get(sdKey); attr != "" {
attrs.UpsertString(attributeKey, attr)
}
}
controllerName := serviceDiscoveryLabels.Get("__meta_kubernetes_pod_controller_name")
controllerKind := serviceDiscoveryLabels.Get("__meta_kubernetes_pod_controller_kind")
if controllerKind != "" && controllerName != "" {
switch controllerKind {
case "ReplicaSet":
attrs.UpsertString(conventions.AttributeK8SReplicaSetName, controllerName)
case "DaemonSet":
attrs.UpsertString(conventions.AttributeK8SDaemonSetName, controllerName)
case "StatefulSet":
attrs.UpsertString(conventions.AttributeK8SStatefulSetName, controllerName)
case "Job":
attrs.UpsertString(conventions.AttributeK8SJobName, controllerName)
case "CronJob":
attrs.UpsertString(conventions.AttributeK8SCronJobName, controllerName)
}
}
}
201 changes: 192 additions & 9 deletions receiver/prometheusreceiver/internal/prom_to_otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,56 @@ package internal
import (
"testing"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
)

type jobInstanceDefinition struct {
job, instance, host, scheme, port string
}

type k8sResourceDefinition struct {
podName, podUID, container, node, rs, ds, ss, job, cronjob, ns string
}

func makeK8sResource(jobInstance *jobInstanceDefinition, def *k8sResourceDefinition) *pcommon.Resource {
resource := makeResourceWithJobInstanceScheme(jobInstance, true)
attrs := resource.Attributes()
if def.podName != "" {
attrs.UpsertString(conventions.AttributeK8SPodName, def.podName)
}
if def.podUID != "" {
attrs.UpsertString(conventions.AttributeK8SPodUID, def.podUID)
}
if def.container != "" {
attrs.UpsertString(conventions.AttributeK8SContainerName, def.container)
}
if def.node != "" {
attrs.UpsertString(conventions.AttributeK8SNodeName, def.node)
}
if def.rs != "" {
attrs.UpsertString(conventions.AttributeK8SReplicaSetName, def.rs)
}
if def.ds != "" {
attrs.UpsertString(conventions.AttributeK8SDaemonSetName, def.ds)
}
if def.ss != "" {
attrs.UpsertString(conventions.AttributeK8SStatefulSetName, def.ss)
}
if def.job != "" {
attrs.UpsertString(conventions.AttributeK8SJobName, def.job)
}
if def.cronjob != "" {
attrs.UpsertString(conventions.AttributeK8SCronJobName, def.cronjob)
}
if def.ns != "" {
attrs.UpsertString(conventions.AttributeK8SNamespaceName, def.ns)
}
return resource
}

func makeResourceWithJobInstanceScheme(def *jobInstanceDefinition, hasHost bool) *pcommon.Resource {
resource := pcommon.NewResource()
attrs := resource.Attributes()
Expand All @@ -44,64 +86,205 @@ func TestCreateNodeAndResourcePromToOTLP(t *testing.T) {
tests := []struct {
name, job string
instance string
scheme string
sdLabels labels.Labels
want *pcommon.Resource
}{
{
name: "all attributes proper",
job: "job", instance: "hostname:8888", scheme: "http",
job: "job", instance: "hostname:8888", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: "http"}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, true),
},
{
name: "missing port",
job: "job", instance: "myinstance", scheme: "https",
job: "job", instance: "myinstance", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: "https"}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "myinstance", "myinstance", "https", "",
}, true),
},
{
name: "blank scheme",
job: "job", instance: "myinstance:443", scheme: "",
job: "job", instance: "myinstance:443", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: ""}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "myinstance:443", "myinstance", "", "443",
}, true),
},
{
name: "blank instance, blank scheme",
job: "job", instance: "", scheme: "",
job: "job", instance: "", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: ""}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "", "", "", "",
}, true),
},
{
name: "blank instance, non-blank scheme",
job: "job", instance: "", scheme: "http",
job: "job", instance: "", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: "http"}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "", "", "http", "",
}, true),
},
{
name: "0.0.0.0 address",
job: "job", instance: "0.0.0.0:8888", scheme: "http",
job: "job", instance: "0.0.0.0:8888", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: "http"}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "0.0.0.0:8888", "", "http", "8888",
}, false),
},
{
name: "localhost",
job: "job", instance: "localhost:8888", scheme: "http",
job: "job", instance: "localhost:8888", sdLabels: labels.New(labels.Label{Name: "__scheme__", Value: "http"}),
want: makeResourceWithJobInstanceScheme(&jobInstanceDefinition{
"job", "localhost:8888", "", "http", "8888",
}, false),
},
{
name: "kubernetes daemonset pod",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_pod_name", Value: "my-pod-23491"},
labels.Label{Name: "__meta_kubernetes_pod_uid", Value: "84279wretgu89dg489q2"},
labels.Label{Name: "__meta_kubernetes_pod_container_name", Value: "my-container"},
labels.Label{Name: "__meta_kubernetes_pod_node_name", Value: "k8s-node-123"},
labels.Label{Name: "__meta_kubernetes_pod_controller_name", Value: "my-pod"},
labels.Label{Name: "__meta_kubernetes_pod_controller_kind", Value: "DaemonSet"},
labels.Label{Name: "__meta_kubernetes_namespace", Value: "kube-system"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
podName: "my-pod-23491",
podUID: "84279wretgu89dg489q2",
container: "my-container",
node: "k8s-node-123",
ds: "my-pod",
ns: "kube-system",
}),
},
{
name: "kubernetes replicaset pod",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_pod_name", Value: "my-pod-23491"},
labels.Label{Name: "__meta_kubernetes_pod_uid", Value: "84279wretgu89dg489q2"},
labels.Label{Name: "__meta_kubernetes_pod_container_name", Value: "my-container"},
labels.Label{Name: "__meta_kubernetes_pod_node_name", Value: "k8s-node-123"},
labels.Label{Name: "__meta_kubernetes_pod_controller_name", Value: "my-pod"},
labels.Label{Name: "__meta_kubernetes_pod_controller_kind", Value: "ReplicaSet"},
labels.Label{Name: "__meta_kubernetes_namespace", Value: "kube-system"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
podName: "my-pod-23491",
podUID: "84279wretgu89dg489q2",
container: "my-container",
node: "k8s-node-123",
rs: "my-pod",
ns: "kube-system",
}),
},
{
name: "kubernetes statefulset pod",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_pod_name", Value: "my-pod-23491"},
labels.Label{Name: "__meta_kubernetes_pod_uid", Value: "84279wretgu89dg489q2"},
labels.Label{Name: "__meta_kubernetes_pod_container_name", Value: "my-container"},
labels.Label{Name: "__meta_kubernetes_pod_node_name", Value: "k8s-node-123"},
labels.Label{Name: "__meta_kubernetes_pod_controller_name", Value: "my-pod"},
labels.Label{Name: "__meta_kubernetes_pod_controller_kind", Value: "StatefulSet"},
labels.Label{Name: "__meta_kubernetes_namespace", Value: "kube-system"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
podName: "my-pod-23491",
podUID: "84279wretgu89dg489q2",
container: "my-container",
node: "k8s-node-123",
ss: "my-pod",
ns: "kube-system",
}),
},
{
name: "kubernetes job pod",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_pod_name", Value: "my-pod-23491"},
labels.Label{Name: "__meta_kubernetes_pod_uid", Value: "84279wretgu89dg489q2"},
labels.Label{Name: "__meta_kubernetes_pod_container_name", Value: "my-container"},
labels.Label{Name: "__meta_kubernetes_pod_node_name", Value: "k8s-node-123"},
labels.Label{Name: "__meta_kubernetes_pod_controller_name", Value: "my-pod"},
labels.Label{Name: "__meta_kubernetes_pod_controller_kind", Value: "Job"},
labels.Label{Name: "__meta_kubernetes_namespace", Value: "kube-system"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
podName: "my-pod-23491",
podUID: "84279wretgu89dg489q2",
container: "my-container",
node: "k8s-node-123",
job: "my-pod",
ns: "kube-system",
}),
},
{
name: "kubernetes cronjob pod",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_pod_name", Value: "my-pod-23491"},
labels.Label{Name: "__meta_kubernetes_pod_uid", Value: "84279wretgu89dg489q2"},
labels.Label{Name: "__meta_kubernetes_pod_container_name", Value: "my-container"},
labels.Label{Name: "__meta_kubernetes_pod_node_name", Value: "k8s-node-123"},
labels.Label{Name: "__meta_kubernetes_pod_controller_name", Value: "my-pod"},
labels.Label{Name: "__meta_kubernetes_pod_controller_kind", Value: "CronJob"},
labels.Label{Name: "__meta_kubernetes_namespace", Value: "kube-system"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
podName: "my-pod-23491",
podUID: "84279wretgu89dg489q2",
container: "my-container",
node: "k8s-node-123",
cronjob: "my-pod",
ns: "kube-system",
}),
},
{
name: "kubernetes node (e.g. kubelet)",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_node_name", Value: "k8s-node-123"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
node: "k8s-node-123",
}),
},
{
name: "kubernetes service endpoint",
job: "job", instance: "hostname:8888", sdLabels: labels.New(
labels.Label{Name: "__scheme__", Value: "http"},
labels.Label{Name: "__meta_kubernetes_endpoint_node_name", Value: "k8s-node-123"},
),
want: makeK8sResource(&jobInstanceDefinition{
"job", "hostname:8888", "hostname", "http", "8888",
}, &k8sResourceDefinition{
node: "k8s-node-123",
}),
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
got := CreateNodeAndResourcePdata(tt.job, tt.instance, tt.scheme)
got := CreateNodeAndResourcePdata(tt.job, tt.instance, tt.sdLabels)
got.Attributes().Sort()
tt.want.Attributes().Sort()
require.Equal(t, got, tt.want)
})
}
Expand Down
4 changes: 3 additions & 1 deletion receiver/prometheusreceiver/metrics_receiver_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

gokitlog "github.com/go-kit/log"
promcfg "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -152,8 +153,9 @@ func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, er
return mp, nil, err
}
// update attributes value (will use for validation)
l := []labels.Label{{Name: "__scheme__", Value: "http"}}
for _, t := range tds {
t.attributes = internal.CreateNodeAndResourcePdata(t.name, u.Host, "http").Attributes()
t.attributes = internal.CreateNodeAndResourcePdata(t.name, u.Host, l).Attributes()
}
pCfg, err := promcfg.Load(string(cfg), false, gokitlog.NewNopLogger())
return mp, pCfg, err
Expand Down

0 comments on commit 814c3a9

Please sign in to comment.