diff --git a/odiglet/cmd/main.go b/odiglet/cmd/main.go index 0ab9bc88b..6f0cffb8b 100644 --- a/odiglet/cmd/main.go +++ b/odiglet/cmd/main.go @@ -78,7 +78,7 @@ func main() { } odigosNs := k8senv.GetCurrentNamespace() - err = server.StartOpAmpServer(ctx, log.Logger, mgr, clientset, env.Current.NodeName, odigosNs) + err = server.StartOpAmpServer(ctx, log.Logger, mgr, clientset, env.Current.NodeName, odigosNs, nil) if err != nil { log.Logger.Error(err, "Failed to start opamp server") } diff --git a/opampserver/pkg/connection/types.go b/opampserver/pkg/connection/types.go index bb0b86b10..d628f3eba 100644 --- a/opampserver/pkg/connection/types.go +++ b/opampserver/pkg/connection/types.go @@ -23,4 +23,7 @@ type ConnectionInfo struct { // AgentRemoteConfig is the full remote config opamp message to send to the agent when needed AgentRemoteConfig *protobufs.AgentRemoteConfig RemoteResourceAttributes []configresolvers.ResourceAttribute + + // can be nil or a cancel function to stop the eBPF program and release resources when closing the connection + EbpfCloseFunction func() error } diff --git a/opampserver/pkg/server/ebpf.go b/opampserver/pkg/server/ebpf.go new file mode 100644 index 000000000..fabebd7f6 --- /dev/null +++ b/opampserver/pkg/server/ebpf.go @@ -0,0 +1,30 @@ +package server + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" +) + +type EbpfHooks interface { + + // This function is called when a new opamp client connection is established. + // If set, it is expected for this callback to load any eBPF programs needed for instrumentation + // of this client. + // The function should block until the eBPF programs are loaded, either with success or error. + // If an error is returned, the agent will be signaled not to start the and connection will be closed. + // If the function returns nil, the connection will be allowed to proceed, and the eBPF part is assumed ready. + // + // Input: + // - ctx: the context of the request + // - programmingLanguage: the programming language of the agent, as reported by the agent, conforming to otel semconv + // - pid: the process id of the agent process, which is used to inject the eBPF programs + // - serviceName: the service name to use as resource attribute for generated telemetry + // - resourceAttributes: a list of resource attributes to populate in the resource of the generated telemetry + // + // Output: + // - error: if an error occurred during the loading of the eBPF programs + // - cancelFunc: if loaded successfully, a cancel function to be called when the connection is closed to unload the eBPF programs and release resources + // at the moment, errors from cancel are logged and ignored + OnNewInstrumentedProcess(ctx context.Context, programmingLanguage string, pid int64, serviceName string, resourceAttributes []attribute.KeyValue) (func() error, error) +} diff --git a/opampserver/pkg/server/handlers.go b/opampserver/pkg/server/handlers.go index 1d8092689..d577c37c1 100644 --- a/opampserver/pkg/server/handlers.go +++ b/opampserver/pkg/server/handlers.go @@ -31,6 +31,7 @@ type ConnectionHandlers struct { kubeClientSet *kubernetes.Clientset scheme *runtime.Scheme // TODO: revisit this, we should not depend on controller runtime nodeName string + ebpfcb EbpfHooks } type opampAgentAttributesKeys struct { @@ -95,6 +96,16 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin } c.logger.Info("new OpAMP client connected", "deviceId", deviceId, "namespace", k8sAttributes.Namespace, "podName", k8sAttributes.PodName, "instrumentedAppName", instrumentedAppName, "workloadKind", k8sAttributes.WorkloadKind, "workloadName", k8sAttributes.WorkloadName, "containerName", k8sAttributes.ContainerName, "otelServiceName", k8sAttributes.OtelServiceName) + var ebpfCloseFunction func() error + if c.ebpfcb != nil { + otelResourceAttrs := opampResourceAttributesToOtel(remoteResourceAttributes) + closeFunction, err := c.ebpfcb.OnNewInstrumentedProcess(ctx, attrs.ProgrammingLanguage, pid, k8sAttributes.OtelServiceName, otelResourceAttrs) + if err != nil { + return nil, nil, fmt.Errorf("failed to load ebpf instrumentation program: %w", err) + } + ebpfCloseFunction = closeFunction + } + connectionInfo := &connection.ConnectionInfo{ DeviceId: deviceId, Workload: podWorkload, @@ -105,6 +116,7 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin InstrumentedAppName: instrumentedAppName, AgentRemoteConfig: fullRemoteConfig, RemoteResourceAttributes: remoteResourceAttributes, + EbpfCloseFunction: ebpfCloseFunction, } serverToAgent := &protobufs.ServerToAgent{ @@ -132,10 +144,36 @@ func (c *ConnectionHandlers) OnAgentToServerMessage(ctx context.Context, request } func (c *ConnectionHandlers) OnConnectionClosed(ctx context.Context, connectionInfo *connection.ConnectionInfo) { + + if connectionInfo == nil { + // should not happen, safegurad against nil pointer + c.logger.Error(fmt.Errorf("missing connection info"), "OnConnectionClosed called with nil connection info") + return + } + if connectionInfo.EbpfCloseFunction != nil { + // signal the eBPF program to stop and release resources. fire and forget + err := connectionInfo.EbpfCloseFunction() + if err != nil { + c.logger.Error(err, "failed to unload eBPF program") + } + } + // keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting } func (c *ConnectionHandlers) OnConnectionNoHeartbeat(ctx context.Context, connectionInfo *connection.ConnectionInfo) error { + + if connectionInfo == nil { + return fmt.Errorf("missing connection info") + } + if connectionInfo.EbpfCloseFunction != nil { + // signal the eBPF program to stop and release resources. fire and forget + err := connectionInfo.EbpfCloseFunction() + if err != nil { + c.logger.Error(err, "failed to unload eBPF program") + } + } + healthy := false message := fmt.Sprintf("OpAMP server did not receive heartbeat from the agent, last message time: %s", connectionInfo.LastMessageTime.Format("2006-01-02 15:04:05 MST")) // keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting diff --git a/opampserver/pkg/server/server.go b/opampserver/pkg/server/server.go index c8c6b0dbf..6c1dfcf3b 100644 --- a/opampserver/pkg/server/server.go +++ b/opampserver/pkg/server/server.go @@ -17,7 +17,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string) error { +func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet *kubernetes.Clientset, nodeName string, odigosNs string, ebpfcb EbpfHooks) error { listenEndpoint := fmt.Sprintf("0.0.0.0:%d", OpAmpServerDefaultPort) logger.Info("Starting opamp server", "listenEndpoint", listenEndpoint) @@ -39,6 +39,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, kubeClientSet: kubeClientSet, scheme: mgr.GetScheme(), nodeName: nodeName, + ebpfcb: ebpfcb, } http.HandleFunc("POST /v1/opamp", func(w http.ResponseWriter, req *http.Request) { @@ -165,7 +166,7 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager, if err := server.Shutdown(ctx); err != nil { logger.Error(err, "Failed to shut down the http server for incoming connections") } - logger.Info("Shutting down live connections timeout monitor") + logger.Info("Shutting down OpAMP server gracefully due to context cancellation") return case <-ticker.C: // Clean up stale connections diff --git a/opampserver/pkg/server/utils.go b/opampserver/pkg/server/utils.go index d7be2c775..45ab369ce 100644 --- a/opampserver/pkg/server/utils.go +++ b/opampserver/pkg/server/utils.go @@ -3,9 +3,20 @@ package server import ( "strconv" + "github.com/odigos-io/odigos/opampserver/pkg/sdkconfig/configresolvers" "github.com/odigos-io/odigos/opampserver/protobufs" + "go.opentelemetry.io/otel/attribute" ) +func opampResourceAttributesToOtel(opampResourceAttributes []configresolvers.ResourceAttribute) []attribute.KeyValue { + otelAttributes := make([]attribute.KeyValue, 0, len(opampResourceAttributes)) + for _, attr := range opampResourceAttributes { + // TODO: support any type, not just string + otelAttributes = append(otelAttributes, attribute.String(attr.Key, attr.Value)) + } + return otelAttributes +} + func ConvertAnyValueToString(value *protobufs.AnyValue) string { switch v := value.Value.(type) { case *protobufs.AnyValue_StringValue: