Skip to content

Commit

Permalink
Remove k8s.cluster.name from eks detector (#2898)
Browse files Browse the repository at this point in the history
Turns out the cluster detection was not reliable as it doesn't work with a
vanilla EKS cluster. Requires some sort of cloudwatch addon. Can't find a
suitable replacement so removing the code. Users can still add it manually
using OTEL_RESOURCE_ATTRIBUTES or some other resource modifying processor.
  • Loading branch information
jrcamp authored Mar 26, 2021
1 parent 5325d7f commit 88782b0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 195 deletions.
162 changes: 6 additions & 156 deletions processor/resourcedetectionprocessor/internal/aws/eks/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ package eks

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"

"go.opentelemetry.io/collector/component"
Expand All @@ -35,176 +29,32 @@ const (
// TypeStr is type of detector.
TypeStr = "eks"

/* #nosec */
k8sTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
k8sCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
authConfigmapPath = "https://kubernetes.default.svc/api/v1/namespaces/kube-system/configmaps/aws-auth"
cwConfigmapPath = "https://kubernetes.default.svc/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info"
// Environment variable that is set when running on Kubernetes.
kubernetesServiceHostEnvVar = "KUBERNETES_SERVICE_HOST"
)

// detectorUtils is used for testing the resourceDetector by abstracting functions that rely on external systems.
type detectorUtils interface {
// fileExists returns true if the file exists, otherwise false.
fileExists(filename string) bool
// fetchString executes an HTTP request with a given HTTP Method and URL string returning
// the content body or an error.
fetchString(ctx context.Context, httpMethod string, URL string) (string, error)
}

// This struct will implement the detectorUtils interface
type eksDetectorUtils struct{}

// This struct will help unmarshal clustername from JSON response
type data struct {
ClusterName string `json:"cluster.name"`
}

var _ internal.Detector = (*Detector)(nil)

// Detector for EKS
type Detector struct {
utils detectorUtils
}

// Compile time assertion that eksDetectorUtils implements the detectorUtils interface.
var _ detectorUtils = (*eksDetectorUtils)(nil)
type Detector struct{}

// NewDetector returns a resource detector that will detect AWS EKS resources.
func NewDetector(_ component.ProcessorCreateParams, _ internal.DetectorConfig) (internal.Detector, error) {
return &Detector{utils: &eksDetectorUtils{}}, nil
return &Detector{}, nil
}

// Detect returns a Resource describing the Amazon EKS environment being run in.
func (detector *Detector) Detect(ctx context.Context) (pdata.Resource, error) {
res := pdata.NewResource()
isEks, err := isEKS(ctx, detector.utils)
if err != nil {
return res, err
}

// Return empty resource object if not running in EKS
if !isEks {
// Check if running on k8s.
if os.Getenv(kubernetesServiceHostEnvVar) == "" {
return res, nil
}

attr := res.Attributes()
attr.InsertString(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
attr.InsertString(conventions.AttributeCloudPlatform, conventions.AttributeCloudPlatformAWSEKS)

// Get clusterName and append to attributes
clusterName, err := getClusterName(ctx, detector.utils)
if err != nil {
return res, err
}
if clusterName != "" {
attr.InsertString(conventions.AttributeK8sCluster, clusterName)
}

return res, nil
}

// isEKS checks if the current environment is running in EKS.
func isEKS(ctx context.Context, utils detectorUtils) (bool, error) {
if !isK8s(utils) {
return false, nil
}

// Make HTTP GET request
awsAuth, err := utils.fetchString(ctx, http.MethodGet, authConfigmapPath)
if err != nil {
return false, fmt.Errorf("isEks() error retrieving auth configmap: %w", err)
}

return awsAuth != "", nil
}

// isK8s checks if the current environment is running in a Kubernetes environment
func isK8s(utils detectorUtils) bool {
return utils.fileExists(k8sTokenPath) && utils.fileExists(k8sCertPath)
}

// fileExists returns true if the file exists, otherwise false.
func (eksUtils eksDetectorUtils) fileExists(filename string) bool {
info, err := os.Stat(filename)
return err == nil && !info.IsDir()
}

// fetchString executes an HTTP request with a given HTTP Method and URL string returning
// the content body or an error.
func (eksUtils eksDetectorUtils) fetchString(ctx context.Context, httpMethod string, URL string) (string, error) {
request, err := http.NewRequestWithContext(ctx, httpMethod, URL, nil)
if err != nil {
return "", fmt.Errorf("failed to create new HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

// Set HTTP request header with authentication credentials
k8sToken, err := getK8sToken()
if err != nil {
return "", err
}
request.Header.Set("Authorization", "Bearer "+k8sToken)

// Get certificate
caCert, err := ioutil.ReadFile(k8sCertPath)
if err != nil {
return "", fmt.Errorf("failed to read file with path %s", k8sCertPath)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

// Set HTTP request timeout and add certificate
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
},
}

response, err := client.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to execute HTTP request with method=%s, URL=%s, Status Code=%d: %w", httpMethod, URL, response.StatusCode, err)
}

// Retrieve response body from HTTP request
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("failed to read response from HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

return string(body), nil
}

// getK8sToken retrieves the kubernetes credential information.
func getK8sToken() (string, error) {
content, err := ioutil.ReadFile(k8sTokenPath)
if err != nil {
return "", fmt.Errorf("getK8sToken() error: cannot read file with path %s", k8sTokenPath)
}

return string(content), nil
}

// getClusterName retrieves the clusterName resource attribute
func getClusterName(ctx context.Context, utils detectorUtils) (string, error) {
resp, err := utils.fetchString(ctx, "GET", cwConfigmapPath)
if err != nil {
return "", fmt.Errorf("getClusterName() error: %w", err)
}

// parse JSON object returned from HTTP request
var respmap map[string]json.RawMessage
err = json.Unmarshal([]byte(resp), &respmap)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}
var d data
err = json.Unmarshal(respmap["data"], &d)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}

clusterName := d.ClusterName

return clusterName, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,45 @@ package eks

import (
"context"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
)

type MockDetectorUtils struct {
mock.Mock
}

// Mock function for fileExists()
func (detectorUtils *MockDetectorUtils) fileExists(filename string) bool {
args := detectorUtils.Called(filename)
return args.Bool(0)
}

// Mock function for fetchString()
func (detectorUtils *MockDetectorUtils) fetchString(ctx context.Context, httpMethod string, URL string) (string, error) {
args := detectorUtils.Called(ctx, httpMethod, URL)
return args.String(0), args.Error(1)
}

func TestNewDetector(t *testing.T) {
detector, err := NewDetector(component.ProcessorCreateParams{Logger: zap.NewNop()}, nil)
assert.NoError(t, err)
assert.NotNil(t, detector)
}

// Tests EKS resource detector running in EKS environment
func TestEks(t *testing.T) {
detectorUtils := &MockDetectorUtils{}
func TestEKS(t *testing.T) {
ctx := context.Background()

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(true)
detectorUtils.On("fileExists", k8sCertPath).Return(true)
detectorUtils.On("fetchString", ctx, "GET", authConfigmapPath).Return("not empty", nil)
detectorUtils.On("fetchString", ctx, "GET", cwConfigmapPath).Return(`{"data":{"cluster.name":"my-cluster"}}`, nil)
require.NoError(t, os.Setenv("KUBERNETES_SERVICE_HOST", "localhost"))

// Call EKS Resource detector to detect resources
eksResourceDetector := &Detector{utils: detectorUtils}
eksResourceDetector := &Detector{}
res, err := eksResourceDetector.Detect(ctx)
require.NoError(t, err)

assert.Equal(t, map[string]interface{}{
"cloud.provider": "aws",
"cloud.platform": "aws_eks",
"k8s.cluster.name": "my-cluster",
"cloud.provider": "aws",
"cloud.platform": "aws_eks",
}, internal.AttributesToMap(res.Attributes()), "Resource object returned is incorrect")
detectorUtils.AssertExpectations(t)
}

// Tests EKS resource detector not running in EKS environment
func TestNotEKS(t *testing.T) {
detectorUtils := new(MockDetectorUtils)

/* #nosec */
k8sTokenPath := "/var/run/secrets/kubernetes.io/serviceaccount/token"

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(false)

detector := Detector{detectorUtils}
detector := Detector{}
require.NoError(t, os.Unsetenv("KUBERNETES_SERVICE_HOST"))
r, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, 0, r.Attributes().Len(), "Resource object should be empty")
detectorUtils.AssertExpectations(t)
}

0 comments on commit 88782b0

Please sign in to comment.