Skip to content

Commit

Permalink
fix: fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamir David authored and Tamir David committed Oct 7, 2024
1 parent e79efff commit 8ee6a28
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 67 deletions.
94 changes: 57 additions & 37 deletions instrumentor/controllers/instrumentationdevice/pods_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package instrumentationdevice
import (
"context"
"fmt"
"strings"

common "github.com/odigos-io/odigos/common"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/odigos-io/odigos/k8sutils/pkg/workload"
corev1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/runtime"
)

const (
EnvVarNamespace = "ODIGOS_CONTAINER_NAMESPACE"
EnvVarNamespace = "ODIGOS_WORKLOAD_NAMESPACE"
EnvVarWorkloadKind = "ODIGOS_WORKLOAD_KIND"
EnvVarContainerName = "ODIGOS_CONTAINER_NAME"
EnvVarPodName = "ODIGOS_POD_NAME"
Expand All @@ -39,51 +41,69 @@ func (p *PodsWebhook) Default(ctx context.Context, obj runtime.Object) error {
}

func injectOdigosEnvVars(pod *corev1.Pod) {

namespace := pod.Namespace
workloadKind := getWorkloadKind(pod)
workloadKind := workload.GetWorkloadKind(pod)

// Common environment variables that do not change across containers
commonEnvVars := []corev1.EnvVar{
{
Name: EnvVarNamespace,
Value: namespace,
},
{
Name: EnvVarWorkloadKind,
Value: workloadKind,
},
{
Name: EnvVarPodName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
}

for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]

envVars := []corev1.EnvVar{
{
Name: EnvVarNamespace,
Value: namespace,
},
{
Name: EnvVarWorkloadKind,
Value: workloadKind,
},
{
Name: EnvVarContainerName,
Value: container.Name,
},
{
Name: EnvVarPodName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
// Check if the container does NOT have device in conatiner limits. If so, skip the environment injection.
if !hasOdigosInstrumentationInLimits(container.Resources) {
continue
}

// Check if the environment variables are already present, if so skip inject them again.
if envVarsExist(container.Env, commonEnvVars) {
continue
}

container.Env = append(container.Env, envVars...)
container.Env = append(container.Env, append(commonEnvVars, corev1.EnvVar{
Name: EnvVarContainerName,
Value: container.Name,
})...)
}
}

func envVarsExist(containerEnv []corev1.EnvVar, commonEnvVars []corev1.EnvVar) bool {
envMap := make(map[string]bool)
for _, envVar := range containerEnv {
envMap[envVar.Name] = true
}

for _, commonEnvVar := range commonEnvVars {
if envMap[commonEnvVar.Name] {
return true
}
}
return false
}

// We can assume ReplicaSet is Deployment because this come after we identify the workload is supported by Odigos
func getWorkloadKind(pod *corev1.Pod) string {
for _, ownerRef := range pod.OwnerReferences {
switch ownerRef.Kind {
case "ReplicaSet":
return "Deployment"
case "StatefulSet":
return "StatefulSet"
case "DaemonSet":
return "DaemonSet"
// Helper function to check if a container's resource limits have a key starting with the specified namespace
func hasOdigosInstrumentationInLimits(resources corev1.ResourceRequirements) bool {
for resourceName := range resources.Limits {
if strings.HasPrefix(string(resourceName), common.OdigosResourceNamespace) {
return true
}
}
return "Unknown"
return false
}
7 changes: 1 addition & 6 deletions k8sutils/pkg/workload/ownerreference.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ import (
// from the provided owner reference.
func GetWorkloadFromOwnerReference(ownerReference metav1.OwnerReference) (workloadName string, workloadKind WorkloadKind, err error) {

workloadName, workloadKind, err = GetWorkloadNameAndKind(ownerReference.Name, ownerReference.Kind)
if err != nil {
return "", "", err
}

return workloadName, workloadKind, nil
return GetWorkloadNameAndKind(ownerReference.Name, ownerReference.Kind)
}

func GetWorkloadNameAndKind(ownerName, ownerKind string) (string, WorkloadKind, error) {
Expand Down
14 changes: 14 additions & 0 deletions k8sutils/pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,17 @@ func IsInstrumentationDisabledExplicitly(obj client.Object) bool {

return false
}

func GetWorkloadKind(pod *corev1.Pod) string {
for _, ownerRef := range pod.OwnerReferences {
switch ownerRef.Kind {
case "ReplicaSet":
return "Deployment"
case "StatefulSet":
return "StatefulSet"
case "DaemonSet":
return "DaemonSet"
}
}
return "Unknown"
}
46 changes: 25 additions & 21 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ConnectionHandlers struct {
sdkConfig *sdkconfig.SdkConfigManager
logger logr.Logger
kubeclient client.Client
kubeClientSet *kubernetes.Clientset
scheme *runtime.Scheme // TODO: revisit this, we should not depend on controller runtime
nodeName string
}
Expand All @@ -36,11 +37,10 @@ type opampAgentAttributesKeys struct {
ProgrammingLanguage string
ContainerName string
PodName string
WorkloadKind string
Namespace string
}

func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId string, firstMessage *protobufs.AgentToServer, kubeClient *kubernetes.Clientset) (*connection.ConnectionInfo, *protobufs.ServerToAgent, error) {
func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId string, firstMessage *protobufs.AgentToServer) (*connection.ConnectionInfo, *protobufs.ServerToAgent, error) {

if firstMessage.AgentDescription == nil {
// first message must be agent description.
Expand All @@ -64,13 +64,13 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
return nil, nil, fmt.Errorf("missing pid in agent description")
}

attrs := extractOpampAgentAttributes(firstMessage.AgentDescription.IdentifyingAttributes)
attrs := extractOpampAgentAttributes(firstMessage.AgentDescription)

if attrs.ProgrammingLanguage == "" {
return nil, nil, fmt.Errorf("missing programming language in agent description")
}

k8sAttributes, pod, err := c.resolveK8sAttributes(ctx, attrs, deviceId, kubeClient, c.logger)
k8sAttributes, pod, err := c.resolveK8sAttributes(ctx, attrs, deviceId, c.logger)
if err != nil {
return nil, nil, fmt.Errorf("failed to process k8s attributes: %w", err)
}
Expand Down Expand Up @@ -199,19 +199,19 @@ func (c *ConnectionHandlers) UpdateInstrumentationInstanceStatus(ctx context.Con

// resolveK8sAttributes resolves K8s resource attributes using either direct attributes from opamp agent or device cache
func (c *ConnectionHandlers) resolveK8sAttributes(ctx context.Context, attrs opampAgentAttributesKeys,
deviceId string, kubeClient *kubernetes.Clientset, logger logr.Logger) (*di.K8sResourceAttributes, *corev1.Pod, error) {
deviceId string, logger logr.Logger) (*di.K8sResourceAttributes, *corev1.Pod, error) {

if attrs.hasRequiredAttributes() {
podInfoResolver := di.NewK8sPodInfoResolver(logger, kubeClient)
return resolveFromDirectAttributes(ctx, attrs, podInfoResolver, kubeClient)
podInfoResolver := di.NewK8sPodInfoResolver(logger, c.kubeClientSet)
return resolveFromDirectAttributes(ctx, attrs, podInfoResolver, c.kubeClientSet)
}
return c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId)
}

func extractOpampAgentAttributes(attributes []*protobufs.KeyValue) opampAgentAttributesKeys {
func extractOpampAgentAttributes(agentDescription *protobufs.AgentDescription) opampAgentAttributesKeys {
result := opampAgentAttributesKeys{}

for _, attr := range attributes {
for _, attr := range agentDescription.IdentifyingAttributes {
switch attr.Key {
case string(semconv.TelemetrySDKLanguageKey):
result.ProgrammingLanguage = attr.Value.GetStringValue()
Expand All @@ -221,40 +221,44 @@ func extractOpampAgentAttributes(attributes []*protobufs.KeyValue) opampAgentAtt
result.PodName = attr.Value.GetStringValue()
case string(semconv.K8SNamespaceNameKey):
result.Namespace = attr.Value.GetStringValue()
case "k8s.workload.kind":
result.WorkloadKind = attr.Value.GetStringValue()
}
}

return result
}

func (k opampAgentAttributesKeys) hasRequiredAttributes() bool {
return k.ContainerName != "" && k.PodName != "" && k.WorkloadKind != "" && k.Namespace != ""
return k.ContainerName != "" && k.PodName != "" && k.Namespace != ""
}

func resolveFromDirectAttributes(ctx context.Context, attrs opampAgentAttributesKeys,
podInfoResolver *di.K8sPodInfoResolver, kubeClient *kubernetes.Clientset) (*di.K8sResourceAttributes, *corev1.Pod, error) {

workloadName, _, err := workload.GetWorkloadNameAndKind(attrs.PodName, attrs.WorkloadKind)
pod, err := kubeClient.CoreV1().Pods(attrs.Namespace).Get(ctx, attrs.PodName, metav1.GetOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to get workload name and kind: %w", err)
return nil, nil, err
}

serviceName := podInfoResolver.ResolveServiceName(ctx, attrs.PodName, attrs.WorkloadKind, &di.ContainerDetails{
PodName: attrs.PodName,
})
var workloadName string
var workloadKind workload.WorkloadKind

pod, err := kubeClient.CoreV1().Pods(attrs.Namespace).Get(ctx, attrs.PodName, metav1.GetOptions{})
if err != nil {
return nil, nil, err
ownerRefs := pod.GetOwnerReferences()
for _, ownerRef := range ownerRefs {
workloadName, workloadKind, err = workload.GetWorkloadFromOwnerReference(ownerRef)
if err != nil {
return nil, nil, fmt.Errorf("failed to get workload from owner reference: %w", err)
}
}

serviceName := podInfoResolver.ResolveServiceName(ctx, attrs.PodName, workloadName, &di.ContainerDetails{
PodName: attrs.PodName,
})

k8sAttributes := &di.K8sResourceAttributes{
Namespace: attrs.Namespace,
PodName: attrs.PodName,
ContainerName: attrs.ContainerName,
WorkloadKind: attrs.WorkloadKind,
WorkloadKind: string(workloadKind),
WorkloadName: workloadName,
OtelServiceName: serviceName,
}
Expand Down
7 changes: 4 additions & 3 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClient *kubernetes.Clientset, nodeName string, odigosNs string) error {
func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string) error {

listenEndpoint := fmt.Sprintf("0.0.0.0:%d", OpAmpServerDefaultPort)
logger.Info("Starting opamp server", "listenEndpoint", listenEndpoint)

deviceidCache, err := deviceid.NewDeviceIdCache(logger, kubeClient)
deviceidCache, err := deviceid.NewDeviceIdCache(logger, kubeClientSet)
if err != nil {
return err
}
Expand All @@ -36,6 +36,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
deviceIdCache: deviceidCache,
sdkConfig: sdkConfig,
kubeclient: mgr.GetClient(),
kubeClientSet: kubeClientSet,
scheme: mgr.GetScheme(),
nodeName: nodeName,
}
Expand Down Expand Up @@ -84,7 +85,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
var serverToAgent *protobufs.ServerToAgent
connectionInfo, exists := connectionCache.GetConnection(instanceUid)
if !exists {
connectionInfo, serverToAgent, err = handlers.OnNewConnection(ctx, deviceId, &agentToServer, kubeClient)
connectionInfo, serverToAgent, err = handlers.OnNewConnection(ctx, deviceId, &agentToServer)
if err != nil {
logger.Error(err, "Failed to process new connection")
w.WriteHeader(http.StatusInternalServerError)
Expand Down

0 comments on commit 8ee6a28

Please sign in to comment.