Skip to content

Commit

Permalink
Opamp servicename refactor (#1937)
Browse files Browse the repository at this point in the history
Co-authored-by: alonkeyval <alonbraymok007@gmail.com>
  • Loading branch information
alonkeyval and alonbraymok authored Dec 9, 2024
1 parent f421299 commit 43e210a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 104 deletions.
2 changes: 1 addition & 1 deletion odiglet/debug.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RUN ARCH_SUFFIX=$(cat /tmp/arch_suffix) && \
unzip opentelemetry-dotnet-instrumentation-linux-glibc-${ARCH_SUFFIX}.zip && \
rm opentelemetry-dotnet-instrumentation-linux-glibc-${ARCH_SUFFIX}.zip

FROM --platform=$BUILDPLATFORM keyval/odiglet-base:v1.5 AS builder
FROM --platform=$BUILDPLATFORM keyval/odiglet-base:v1.7 AS builder
WORKDIR /go/src/github.com/odigos-io/odigos
# Copyy local modules required by the build
COPY api/ api/
Expand Down
24 changes: 10 additions & 14 deletions opampserver/pkg/deviceid/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
)

type K8sResourceAttributes struct {
OtelServiceName string
Namespace string
WorkloadKind string
WorkloadName string
PodName string
ContainerName string
Namespace string
WorkloadKind string
WorkloadName string
PodName string
ContainerName string
}

type DeviceIdCache struct {
Expand Down Expand Up @@ -64,15 +63,12 @@ func (d *DeviceIdCache) GetAttributesFromDevice(ctx context.Context, deviceId st
return nil, pod, err
}

serviceName := d.podInfoResolver.ResolveServiceName(ctx, workloadName, workloadKind, containerDetails)

k8sAttributes := &K8sResourceAttributes{
Namespace: containerDetails.PodNamespace,
PodName: containerDetails.PodName,
ContainerName: containerDetails.ContainerName,
WorkloadKind: workloadKind,
WorkloadName: workloadName,
OtelServiceName: serviceName,
Namespace: containerDetails.PodNamespace,
PodName: containerDetails.PodName,
ContainerName: containerDetails.ContainerName,
WorkloadKind: workloadKind,
WorkloadName: workloadName,
}

d.logger.V(1).Info("resolved device id to container details", "k8sAttributes", k8sAttributes)
Expand Down
56 changes: 0 additions & 56 deletions opampserver/pkg/deviceid/k8sattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package deviceid

import (
"context"
"errors"

"github.com/go-logr/logr"
"github.com/odigos-io/odigos/common/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,60 +24,6 @@ func NewK8sPodInfoResolver(logger logr.Logger, kubeClient *kubernetes.Clientset)
}
}

func (k *K8sPodInfoResolver) getServiceNameFromAnnotation(ctx context.Context, name string, kind string, namespace string) (string, bool) {
obj, err := k.getWorkloadObject(ctx, name, kind, namespace)
if err != nil {
k.logger.Error(err, "failed to get workload object to resolve reported service name annotation. will use fallback service name")
return "", false
}

annotations := obj.GetAnnotations()
if annotations == nil {
// no annotations, so service name is not specified by user. fallback to workload name
return "", false
}

overwrittenName, exists := annotations[consts.OdigosReportedNameAnnotation]
if !exists {
// the is no annotation by user for specific reported service name for this workload
// fallback to workload name
return "", false
}

return overwrittenName, true
}

func (k *K8sPodInfoResolver) getWorkloadObject(ctx context.Context, name string, kind string, namespace string) (metav1.Object, error) {
switch kind {
case "Deployment":
return k.kubeClient.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{})
case "StatefulSet":
return k.kubeClient.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
case "DaemonSet":
return k.kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, name, metav1.GetOptions{})
case "Pod":
return k.kubeClient.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
}

return nil, errors.New("failed to get workload object for kind: " + kind)
}

// Resolves the service name, with the following priority:
// 1. If the user added reported name annotation to the workload, use it
// 2. Otherwise, use the workload name as service name
//
// if one of the above conditions has err, it will be logged and the next condition will be checked
func (k *K8sPodInfoResolver) ResolveServiceName(ctx context.Context, workloadName string, workloadKind string, containerDetails *ContainerDetails) string {

// we always fetch the fresh service name from the annotation to make sure the most up to date value is returned
serviceName, foundReportedName := k.getServiceNameFromAnnotation(ctx, workloadName, workloadKind, containerDetails.PodNamespace)
if foundReportedName {
return serviceName
}

return workloadName
}

// GetWorkloadNameByOwner gets the workload name and kind from the owner reference
func (k *K8sPodInfoResolver) GetWorkloadNameByOwner(ctx context.Context, podNamespace string, podName string) (string, string, *corev1.Pod, error) {
pod, err := k.kubeClient.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ResourceAttribute struct {
// - pod name
// - container name
// - object name and kind (deployment, statefulset, daemonset, pod)
func CalculateServerAttributes(k8sAttributes *deviceid.K8sResourceAttributes, nodeName string) ([]ResourceAttribute, error) {
func CalculateServerAttributes(k8sAttributes *deviceid.K8sResourceAttributes, nodeName string, serviceName string) ([]ResourceAttribute, error) {

serverOfferResourceAttributes := []ResourceAttribute{
{
Expand All @@ -32,7 +32,7 @@ func CalculateServerAttributes(k8sAttributes *deviceid.K8sResourceAttributes, no
},
{
Key: string(semconv.ServiceNameKey),
Value: k8sAttributes.OtelServiceName,
Value: serviceName,
},
{
Key: string(semconv.K8SPodNameKey),
Expand Down
18 changes: 7 additions & 11 deletions opampserver/pkg/sdkconfig/sdkconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
"slices"

"github.com/go-logr/logr"
"github.com/odigos-io/odigos/api/odigos/v1alpha1"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
Expand Down Expand Up @@ -52,9 +53,10 @@ func NewSdkConfigManager(logger logr.Logger, mgr ctrl.Manager, connectionCache *
return sdkConfigManager
}

func (m *SdkConfigManager) GetFullConfig(ctx context.Context, remoteResourceAttributes []configresolvers.ResourceAttribute, podWorkload *workload.PodWorkload, instrumentedAppName string, programmingLanguage string) (*protobufs.AgentRemoteConfig, error) {
func (m *SdkConfigManager) GetFullConfig(ctx context.Context, remoteResourceAttributes []configresolvers.ResourceAttribute, podWorkload *workload.PodWorkload, instrumentedAppName string, programmingLanguage string,
instrumentationConfig *odigosv1.InstrumentationConfig) (*protobufs.AgentRemoteConfig, error) {

var nodeCollectorGroup v1alpha1.CollectorsGroup
var nodeCollectorGroup odigosv1.CollectorsGroup
err := m.mgr.GetClient().Get(ctx, client.ObjectKey{Name: k8sconsts.OdigosNodeCollectorCollectorGroupName, Namespace: m.odigosNs}, &nodeCollectorGroup)
if err != nil {
return nil, err
Expand All @@ -80,14 +82,8 @@ func (m *SdkConfigManager) GetFullConfig(ctx context.Context, remoteResourceAttr
return nil, err
}

// We are moving towards passing all Instrumentation capabilities unchanged within the instrumentationConfig to the opamp client.
// Gradually, we will migrate the InstrumentationLibraryConfigs and SDK remote config into the instrumentationConfig and the agents to use it.
instrumentationConfig, err := configsections.GetWorkloadInstrumentationConfig(ctx, m.mgr.GetClient(), instrumentedAppName, podWorkload.Namespace)
if err != nil {
m.logger.Error(err, "failed to get instrumentation config", "k8sAttributes", remoteResourceAttributes)
return nil, err
}

// // We are moving towards passing all Instrumentation capabilities unchanged within the instrumentationConfig to the opamp client.
// // Gradually, we will migrate the InstrumentationLibraryConfigs and SDK remote config into the instrumentationConfig and the agents to use it.
opampRemoteConfigInstrumentationConfig, err := configsections.FilterRelevantSdk(instrumentationConfig, programmingLanguage)

agentConfigMap := protobufs.AgentConfigMap{
Expand Down
43 changes: 23 additions & 20 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
di "github.com/odigos-io/odigos/opampserver/pkg/deviceid"
"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig"
"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers"
"github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configsections"
"github.com/odigos-io/odigos/opampserver/protobufs"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
return nil, nil, fmt.Errorf("missing programming language in agent description")
}

k8sAttributes, pod, err := c.resolveK8sAttributes(ctx, attrs, deviceId, c.logger)
k8sAttributes, pod, err := c.resolveK8sAttributes(ctx, attrs, deviceId)
if err != nil {
return nil, nil, fmt.Errorf("failed to process k8s attributes: %w", err)
}
Expand All @@ -82,18 +83,28 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
}

instrumentedAppName := workload.CalculateWorkloadRuntimeObjectName(k8sAttributes.WorkloadName, k8sAttributes.WorkloadKind)
remoteResourceAttributes, err := configresolvers.CalculateServerAttributes(k8sAttributes, c.nodeName)
instrumentationConfig, err := configsections.GetWorkloadInstrumentationConfig(ctx, c.kubeclient, instrumentedAppName, podWorkload.Namespace)
if err != nil {
c.logger.Error(err, "failed to get instrumentation config", "instrumentedAppName", instrumentedAppName, "namespace", podWorkload.Namespace)
return nil, nil, err
}

serviceName := instrumentationConfig.Spec.ServiceName
if serviceName == "" {
serviceName = k8sAttributes.WorkloadName
}
remoteResourceAttributes, err := configresolvers.CalculateServerAttributes(k8sAttributes, c.nodeName, serviceName)
if err != nil {
c.logger.Error(err, "failed to calculate server attributes", "k8sAttributes", k8sAttributes)
return nil, nil, err
}

fullRemoteConfig, err := c.sdkConfig.GetFullConfig(ctx, remoteResourceAttributes, &podWorkload, instrumentedAppName, attrs.ProgrammingLanguage)
fullRemoteConfig, err := c.sdkConfig.GetFullConfig(ctx, remoteResourceAttributes, &podWorkload, instrumentedAppName, attrs.ProgrammingLanguage, instrumentationConfig)
if err != nil {
c.logger.Error(err, "failed to get full config", "k8sAttributes", k8sAttributes)
return nil, nil, err
}
c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", k8sAttributes.OtelServiceName)
c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", serviceName)

connectionInfo := &connection.ConnectionInfo{
DeviceId: deviceId,
Expand Down Expand Up @@ -198,12 +209,10 @@ func (c *ConnectionHandlers) UpdateInstrumentationInstanceStatus(ctx context.Con
}

// resolveK8sAttributes resolves K8s resource attributes using either direct attributes from opamp agent or device cache
func (c *ConnectionHandlers) resolveK8sAttributes(ctx context.Context, attrs opampAgentAttributesKeys,
deviceId string, logger logr.Logger) (*di.K8sResourceAttributes, *corev1.Pod, error) {
func (c *ConnectionHandlers) resolveK8sAttributes(ctx context.Context, attrs opampAgentAttributesKeys, deviceId string) (*di.K8sResourceAttributes, *corev1.Pod, error) {

if attrs.hasRequiredAttributes() {
podInfoResolver := di.NewK8sPodInfoResolver(logger, c.kubeClientSet)
return resolveFromDirectAttributes(ctx, attrs, podInfoResolver, c.kubeClientSet)
return resolveFromDirectAttributes(ctx, attrs, c.kubeClientSet)
}
return c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId)
}
Expand Down Expand Up @@ -231,8 +240,7 @@ func (k opampAgentAttributesKeys) hasRequiredAttributes() bool {
return k.ContainerName != "" && k.PodName != "" && k.Namespace != ""
}

func resolveFromDirectAttributes(ctx context.Context, attrs opampAgentAttributesKeys,
podInfoResolver *di.K8sPodInfoResolver, kubeClient *kubernetes.Clientset) (*di.K8sResourceAttributes, *corev1.Pod, error) {
func resolveFromDirectAttributes(ctx context.Context, attrs opampAgentAttributesKeys, kubeClient *kubernetes.Clientset) (*di.K8sResourceAttributes, *corev1.Pod, error) {

pod, err := kubeClient.CoreV1().Pods(attrs.Namespace).Get(ctx, attrs.PodName, metav1.GetOptions{})
if err != nil {
Expand All @@ -250,17 +258,12 @@ func resolveFromDirectAttributes(ctx context.Context, attrs opampAgentAttributes
}
}

serviceName := podInfoResolver.ResolveServiceName(ctx, workloadName, string(workloadKind), &di.ContainerDetails{
PodNamespace: attrs.Namespace,
})

k8sAttributes := &di.K8sResourceAttributes{
Namespace: attrs.Namespace,
PodName: attrs.PodName,
ContainerName: attrs.ContainerName,
WorkloadKind: string(workloadKind),
WorkloadName: workloadName,
OtelServiceName: serviceName,
Namespace: attrs.Namespace,
PodName: attrs.PodName,
ContainerName: attrs.ContainerName,
WorkloadKind: string(workloadKind),
WorkloadName: workloadName,
}

return k8sAttributes, pod, nil
Expand Down

0 comments on commit 43e210a

Please sign in to comment.