Skip to content

Commit

Permalink
[processor/k8sattributes] Fixes running with role/rolebinding (open-t…
Browse files Browse the repository at this point in the history
…elemetry#31673)

**Description:** 
This PR allows running the k8sattributes processor with a k8s
role/rolebinding. This can be useful for k8s users w/o access to create
clusterroles and want to enrich pods' telemetry within the scope of a
namespace only. The PR also adds more comprehensive e2e tests including
tests for different RBAC use cases to ensure any changes going forward
do not introduce RBAC incompatibility.

**Link to tracking Issue:**
open-telemetry#14742

**Testing:**
Added e2e tests

**Documentation:** <Describe the documentation added.>
Updated README
  • Loading branch information
jinja2 authored and XinRanZhAWS committed Mar 13, 2024
1 parent ea573b2 commit 86856f2
Show file tree
Hide file tree
Showing 43 changed files with 1,259 additions and 267 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sattr-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allows k8sattributes processor to work with k8s role/rolebindings when filter::namespace is set.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [14742]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
46 changes: 46 additions & 0 deletions internal/k8stest/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8stest // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8stest"

import (
"errors"
"fmt"

"k8s.io/client-go/discovery"
memory "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
)

type K8sClient struct {
DynamicClient *dynamic.DynamicClient
DiscoveryClient *discovery.DiscoveryClient
Mapper *restmapper.DeferredDiscoveryRESTMapper
}

func NewK8sClient(kubeconfigPath string) (*K8sClient, error) {

if kubeconfigPath == "" {
return nil, errors.New("Please provide file path to load kubeconfig")
}
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("unable to load kubeconfig from %s: %w", kubeconfigPath, err)
}

dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("error creating dynamic client: %w", err)
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("error creating discovery client: %w", err)
}

mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient))
return &K8sClient{
DynamicClient: dynamicClient, DiscoveryClient: discoveryClient, Mapper: mapper}, nil
}
12 changes: 12 additions & 0 deletions internal/k8stest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,30 @@ require (
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
Expand All @@ -44,6 +55,7 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.4.0 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
Expand Down
22 changes: 22 additions & 0 deletions internal/k8stest/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions internal/k8stest/k8s_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

func CreateCollectorObjects(t *testing.T, client *dynamic.DynamicClient, testID string) []*unstructured.Unstructured {
manifestsDir := filepath.Join(".", "testdata", "e2e", "collector")
func CreateCollectorObjects(t *testing.T, client *K8sClient, testID string, manifestsDir string) []*unstructured.Unstructured {
if manifestsDir == "" {
manifestsDir = filepath.Join(".", "testdata", "e2e", "collector")
}
manifestFiles, err := os.ReadDir(manifestsDir)
require.NoErrorf(t, err, "failed to read collector manifests directory %s", manifestsDir)
host := HostEndpoint(t)
Expand All @@ -35,6 +36,7 @@ func CreateCollectorObjects(t *testing.T, client *dynamic.DynamicClient, testID
require.NoError(t, tmpl.Execute(manifest, map[string]string{
"Name": "otelcol-" + testID,
"HostEndpoint": host,
"TestID": testID,
}))
obj, err := CreateObject(client, manifest.Bytes())
require.NoErrorf(t, err, "failed to create collector object from manifest %s", manifestFile.Name())
Expand All @@ -52,13 +54,13 @@ func CreateCollectorObjects(t *testing.T, client *dynamic.DynamicClient, testID
return createdObjs
}

func WaitForCollectorToStart(t *testing.T, client *dynamic.DynamicClient, podNamespace string, podLabels map[string]any) {
func WaitForCollectorToStart(t *testing.T, client *K8sClient, podNamespace string, podLabels map[string]any) {
podGVR := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
listOptions := metav1.ListOptions{LabelSelector: SelectorFromMap(podLabels).String()}
podTimeoutMinutes := 3
t.Logf("waiting for collector pods to be ready")
require.Eventuallyf(t, func() bool {
list, err := client.Resource(podGVR).Namespace(podNamespace).List(context.Background(), listOptions)
list, err := client.DynamicClient.Resource(podGVR).Namespace(podNamespace).List(context.Background(), listOptions)
require.NoError(t, err, "failed to list collector pods")
podsNotReady := len(list.Items)
if podsNotReady == 0 {
Expand Down
48 changes: 28 additions & 20 deletions internal/k8stest/k8s_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,51 @@ package k8stest // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/client-go/dynamic"
)

func CreateObject(client *dynamic.DynamicClient, manifest []byte) (*unstructured.Unstructured, error) {
func CreateObject(client *K8sClient, manifest []byte) (*unstructured.Unstructured, error) {
decoder := yaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme)
obj := &unstructured.Unstructured{}
_, gvk, err := decoder.Decode(manifest, nil, obj)
if err != nil {
return nil, err
}
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: strings.ToLower(gvk.Kind + "s"),
gvr, err := client.Mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
return client.Resource(gvr).Namespace(obj.GetNamespace()).Create(context.Background(), obj, metav1.CreateOptions{})
var resource dynamic.ResourceInterface
if gvr.Scope.Name() == meta.RESTScopeNameNamespace {
resource = client.DynamicClient.Resource(gvr.Resource).Namespace(obj.GetNamespace())
} else {
// cluster-scoped resources
resource = client.DynamicClient.Resource(gvr.Resource)
}

return resource.Create(context.Background(), obj, metav1.CreateOptions{})
}

func DeleteObject(client *dynamic.DynamicClient, obj *unstructured.Unstructured) error {
func DeleteObject(client *K8sClient, obj *unstructured.Unstructured) error {
gvk := obj.GroupVersionKind()
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: strings.ToLower(gvk.Kind + "s"),
gvr, err := client.Mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return err
}

options := metav1.DeleteOptions{}
policy := metav1.DeletePropagationBackground
if gvk.Kind == "Job" {
options.PropagationPolicy = &policy
var resource dynamic.ResourceInterface
if gvr.Scope.Name() == meta.RESTScopeNameNamespace {
resource = client.DynamicClient.Resource(gvr.Resource).Namespace(obj.GetNamespace())
} else {
// cluster-scoped resources
resource = client.DynamicClient.Resource(gvr.Resource)
}

return client.Resource(gvr).Namespace(obj.GetNamespace()).Delete(context.Background(), obj.GetName(), options)
deletePolicy := metav1.DeletePropagationForeground
return resource.Delete(context.Background(), obj.GetName(), metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
})
}
Loading

0 comments on commit 86856f2

Please sign in to comment.