diff --git a/.chloggen/feat_opamp-extension-custom-messages.yaml b/.chloggen/feat_opamp-extension-custom-messages.yaml new file mode 100644 index 000000000000..7e16c6f7aa7b --- /dev/null +++ b/.chloggen/feat_opamp-extension-custom-messages.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Added support for other components to register custom capabilities and receive custom messages from an opamp extension" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32021] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/extension/opampextension/README.md b/extension/opampextension/README.md index 20264a951cfa..e1da42368243 100644 --- a/extension/opampextension/README.md +++ b/extension/opampextension/README.md @@ -43,6 +43,30 @@ extensions: endpoint: wss://127.0.0.1:4320/v1/opamp ``` +## Custom Messages + +Other components may use a configured OpAMP extension to send and receive custom messages to and from an OpAMP server. Components may use the provided `components.Host` from the Start method in order to get a handle to the registry: + +```go +func Start(_ context.Context, host component.Host) error { + ext, ok := host.GetExtensions()[opampExtensionID] + if !ok { + return fmt.Errorf("opamp extension %q does not exist", opampExtensionID) + } + + registry, ok := ext.(opampextension.CustomCapabilityRegistry) + if !ok { + return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID) + } + + // You can now use registry.Register to register a custom capability + + return nil +} +``` + +See the [custom_messages.go](./custom_messages.go) for more information on the custom message API. + ## Status This OpenTelemetry OpAMP agent extension is intended to support the [OpAMP diff --git a/extension/opampextension/custom_messages.go b/extension/opampextension/custom_messages.go new file mode 100644 index 000000000000..b6183964432d --- /dev/null +++ b/extension/opampextension/custom_messages.go @@ -0,0 +1,62 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" + +import "github.com/open-telemetry/opamp-go/protobufs" + +// customCapabilityRegisterOptions represents extra options that can be use in CustomCapabilityRegistry.Register +type customCapabilityRegisterOptions struct { + MaxQueuedMessages int +} + +// defaultCustomCapabilityRegisterOptions returns the default options for CustomCapabilityRegisterOptions +func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions { + return &customCapabilityRegisterOptions{ + MaxQueuedMessages: 10, + } +} + +// CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register +type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions) + +// withMaxQueuedMessages overrides the maximum number of queued messages. If a message is received while +// MaxQueuedMessages messages are already queued to be processed, the message is dropped. +func withMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption { + return func(c *customCapabilityRegisterOptions) { + c.MaxQueuedMessages = maxQueuedMessages + } +} + +// CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages. +type CustomCapabilityRegistry interface { + // Register registers a new custom capability. + // It returns a CustomMessageHandler, which can be used to send and receive + // messages to/from the OpAMP server. + Register(capability string, opts ...CustomCapabilityRegisterOption) (handler CustomCapabilityHandler, err error) +} + +// CustomCapabilityHandler represents a handler for a custom capability. +// This can be used to send and receive custom messages to/from an OpAMP server. +// It can also be used to unregister the custom capability when it is no longer supported. +type CustomCapabilityHandler interface { + // Message returns a channel that can be used to receive custom messages sent from the OpAMP server. + Message() <-chan *protobufs.CustomMessage + + // SendMessage sends a custom message to the OpAMP server. + // + // Only one message can be sent at a time. If SendCustomMessage has been already called + // for any capability, and the message is still pending (in progress) + // then subsequent calls to SendCustomMessage will return github.com/open-telemetry/opamp-go/types.ErrCustomMessagePending + // and a channel that will be closed when the pending message is sent. + // To ensure that the previous send is complete and it is safe to send another CustomMessage, + // the caller should wait for the returned channel to be closed before attempting to send another custom message. + // + // If no error is returned, the channel returned will be closed after the specified + // message is sent. + SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) + + // Unregister unregisters the custom capability. After this method returns, SendMessage will always return an error, + // and Message will no longer receive further custom messages. + Unregister() +} diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 968bd216392e..4f96d8247767 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -43,8 +43,12 @@ type opampAgent struct { agentDescription *protobufs.AgentDescription opampClient client.OpAMPClient + + customCapabilityRegistry *customCapabilityRegistry } +var _ CustomCapabilityRegistry = (*opampAgent)(nil) + func (o *opampAgent) Start(ctx context.Context, _ component.Host) error { header := http.Header{} for k, v := range o.cfg.Server.GetHeaders() { @@ -121,6 +125,10 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error return nil } +func (o *opampAgent) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) { + return o.customCapabilityRegistry.Register(capability, opts...) +} + func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) { o.eclk.Lock() defer o.eclk.Unlock() @@ -162,14 +170,16 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r } } + opampClient := cfg.Server.GetClient(logger) agent := &opampAgent{ - cfg: cfg, - logger: logger, - agentType: agentType, - agentVersion: agentVersion, - instanceID: uid, - capabilities: cfg.Capabilities, - opampClient: cfg.Server.GetClient(logger), + cfg: cfg, + logger: logger, + agentType: agentType, + agentVersion: agentVersion, + instanceID: uid, + capabilities: cfg.Capabilities, + opampClient: opampClient, + customCapabilityRegistry: newCustomCapabilityRegistry(logger, opampClient), } return agent, nil @@ -256,15 +266,17 @@ func (o *opampAgent) composeEffectiveConfig() *protobufs.EffectiveConfig { } func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) { - if msg.AgentIdentification == nil { - return - } + if msg.AgentIdentification != nil { + instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) + if err != nil { + o.logger.Error("Failed to parse a new agent identity", zap.Error(err)) + return + } - instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) - if err != nil { - o.logger.Error("Failed to parse a new agent identity", zap.Error(err)) - return + o.updateAgentIdentity(instanceID) } - o.updateAgentIdentity(instanceID) + if msg.CustomMessage != nil { + o.customCapabilityRegistry.ProcessMessage(msg.CustomMessage) + } } diff --git a/extension/opampextension/registry.go b/extension/opampextension/registry.go new file mode 100644 index 000000000000..4c97c5500c20 --- /dev/null +++ b/extension/opampextension/registry.go @@ -0,0 +1,205 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension" + +import ( + "container/list" + "errors" + "fmt" + "slices" + "sync" + + "github.com/open-telemetry/opamp-go/protobufs" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +// customCapabilityClient is a subset of OpAMP client containing only the methods needed for the customCapabilityRegistry. +type customCapabilityClient interface { + SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error + SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) +} + +type customCapabilityRegistry struct { + mux *sync.Mutex + capabilityToMsgChannels map[string]*list.List + client customCapabilityClient + logger *zap.Logger +} + +var _ CustomCapabilityRegistry = (*customCapabilityRegistry)(nil) + +func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry { + return &customCapabilityRegistry{ + mux: &sync.Mutex{}, + capabilityToMsgChannels: make(map[string]*list.List), + client: client, + logger: logger, + } +} + +// Register implements CustomCapabilityRegistry.Register +func (cr *customCapabilityRegistry) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) { + optsStruct := defaultCustomCapabilityRegisterOptions() + for _, opt := range opts { + opt(optsStruct) + } + + cr.mux.Lock() + defer cr.mux.Unlock() + + capabilities := cr.capabilities() + if !slices.Contains(capabilities, capability) { + capabilities = append(capabilities, capability) + } + + err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ + Capabilities: capabilities, + }) + if err != nil { + return nil, fmt.Errorf("set custom capabilities: %w", err) + } + + capabilityList := cr.capabilityToMsgChannels[capability] + if capabilityList == nil { + capabilityList = list.New() + cr.capabilityToMsgChannels[capability] = capabilityList + } + + msgChan := make(chan *protobufs.CustomMessage, optsStruct.MaxQueuedMessages) + callbackElem := capabilityList.PushBack(msgChan) + + unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem) + sender := newCustomMessageHandler(cr, cr.client, capability, msgChan, unregisterFunc) + + return sender, nil +} + +// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered capability handlers for +// the messages capability. +func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) { + cr.mux.Lock() + defer cr.mux.Unlock() + + msgChannels, ok := cr.capabilityToMsgChannels[cm.Capability] + if !ok { + return + } + + for node := msgChannels.Front(); node != nil; node = node.Next() { + msgChan, ok := node.Value.(chan *protobufs.CustomMessage) + if !ok { + continue + } + + // If the channel is full, we will skip sending the message to the receiver. + // We do this because we don't want a misbehaving component to be able to + // block the opamp extension, or block other components from receiving messages. + select { + case msgChan <- cm: + default: + } + } +} + +// removeCapabilityFunc returns a func that removes the custom capability with the given msg channel list element and sender, +// then recalculates and sets the list of custom capabilities on the OpAMP client. +func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element) func() { + return func() { + cr.mux.Lock() + defer cr.mux.Unlock() + + msgChanList := cr.capabilityToMsgChannels[capability] + msgChanList.Remove(callbackElement) + + if msgChanList.Front() == nil { + // Since there are no more callbacks for this capability, + // this capability is no longer supported + delete(cr.capabilityToMsgChannels, capability) + } + + capabilities := cr.capabilities() + err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{ + Capabilities: capabilities, + }) + if err != nil { + // It's OK if we couldn't actually remove the capability, it just means we won't + // notify the server properly, and the server may send us messages that we have no associated callbacks for. + cr.logger.Error("Failed to set new capabilities", zap.Error(err)) + } + } +} + +// capabilities gives the current set of custom capabilities with at least one +// callback registered. +func (cr *customCapabilityRegistry) capabilities() []string { + return maps.Keys(cr.capabilityToMsgChannels) +} + +type customMessageHandler struct { + // unregisteredMux protects unregistered, and makes sure that a message cannot be sent + // on an unregistered capability. + unregisteredMux *sync.Mutex + + capability string + opampClient customCapabilityClient + registry *customCapabilityRegistry + sendChan <-chan *protobufs.CustomMessage + unregisterCapabilityFunc func() + + unregistered bool +} + +var _ CustomCapabilityHandler = (*customMessageHandler)(nil) + +func newCustomMessageHandler( + registry *customCapabilityRegistry, + opampClient customCapabilityClient, + capability string, + sendChan <-chan *protobufs.CustomMessage, + unregisterCapabilityFunc func(), +) *customMessageHandler { + return &customMessageHandler{ + unregisteredMux: &sync.Mutex{}, + + capability: capability, + opampClient: opampClient, + registry: registry, + sendChan: sendChan, + unregisterCapabilityFunc: unregisterCapabilityFunc, + } +} + +// Message implements CustomCapabilityHandler.Message +func (c *customMessageHandler) Message() <-chan *protobufs.CustomMessage { + return c.sendChan +} + +// SendMessage implements CustomCapabilityHandler.SendMessage +func (c *customMessageHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + if c.unregistered { + return nil, errors.New("capability has already been unregistered") + } + + cm := &protobufs.CustomMessage{ + Capability: c.capability, + Type: messageType, + Data: message, + } + + return c.opampClient.SendCustomMessage(cm) +} + +// Unregister implements CustomCapabilityHandler.Unregister +func (c *customMessageHandler) Unregister() { + c.unregisteredMux.Lock() + defer c.unregisteredMux.Unlock() + + c.unregistered = true + + c.unregisterCapabilityFunc() +} diff --git a/extension/opampextension/registry_test.go b/extension/opampextension/registry_test.go new file mode 100644 index 000000000000..79ed2bf23332 --- /dev/null +++ b/extension/opampextension/registry_test.go @@ -0,0 +1,285 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package opampextension + +import ( + "errors" + "fmt" + "testing" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestRegistry_Register(t *testing.T) { + t.Run("Registers successfully", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + + client := mockCustomCapabilityClient{ + setCustomCapabilites: func(customCapabilities *protobufs.CustomCapabilities) error { + require.Equal(t, + &protobufs.CustomCapabilities{ + Capabilities: []string{capabilityString}, + }, + customCapabilities) + return nil + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString) + require.NoError(t, err) + require.NotNil(t, sender) + }) + + t.Run("Setting capabilities fails", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + capabilityErr := errors.New("network error") + + client := mockCustomCapabilityClient{ + setCustomCapabilites: func(_ *protobufs.CustomCapabilities) error { + return capabilityErr + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString) + require.Nil(t, sender) + require.ErrorIs(t, err, capabilityErr) + require.Len(t, registry.capabilityToMsgChannels, 0, "Setting capability failed, but callback ended up in the map anyways") + }) +} + +func TestRegistry_ProcessMessage(t *testing.T) { + t.Run("Calls registered callback", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString) + require.NotNil(t, sender) + require.NoError(t, err) + + registry.ProcessMessage(customMessage) + + require.Equal(t, customMessage, <-sender.Message()) + }) + + t.Run("Skips blocked message channels", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString, withMaxQueuedMessages(0)) + require.NotNil(t, sender) + require.NoError(t, err) + + // If we did not skip sending on blocked channels, we'd expect this to never return. + registry.ProcessMessage(customMessage) + + require.Equal(t, 0, len(sender.Message())) + }) + + t.Run("Callback is called only for its own capability", func(t *testing.T) { + teapotCapabilityString1 := "io.opentelemetry.teapot" + coffeeMakerCapabilityString2 := "io.opentelemetry.coffeeMaker" + + messageType1 := "steep" + messageBytes1 := []byte("blackTea") + + messageType2 := "brew" + messageBytes2 := []byte("blackCoffee") + + customMessageSteep := &protobufs.CustomMessage{ + Capability: teapotCapabilityString1, + Type: messageType1, + Data: messageBytes1, + } + + customMessageBrew := &protobufs.CustomMessage{ + Capability: coffeeMakerCapabilityString2, + Type: messageType2, + Data: messageBytes2, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + teapotSender, err := registry.Register(teapotCapabilityString1) + require.NotNil(t, teapotSender) + require.NoError(t, err) + + coffeeMakerSender, err := registry.Register(coffeeMakerCapabilityString2) + require.NotNil(t, coffeeMakerSender) + require.NoError(t, err) + + registry.ProcessMessage(customMessageSteep) + registry.ProcessMessage(customMessageBrew) + + require.Equal(t, customMessageSteep, <-teapotSender.Message()) + require.Empty(t, teapotSender.Message()) + require.Equal(t, customMessageBrew, <-coffeeMakerSender.Message()) + require.Empty(t, coffeeMakerSender.Message()) + }) +} + +func TestCustomCapability_SendMesage(t *testing.T) { + t.Run("Sends message", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "brew" + mesageBytes := []byte("black") + + client := mockCustomCapabilityClient{ + sendCustomMessage: func(message *protobufs.CustomMessage) (chan struct{}, error) { + require.Equal(t, &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + }, message) + return nil, nil + }, + } + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + sender, err := registry.Register(capabilityString) + require.NoError(t, err) + require.NotNil(t, sender) + + channel, err := sender.SendMessage(messageType, mesageBytes) + require.NoError(t, err) + require.Nil(t, channel, nil) + }) +} + +func TestCustomCapability_Unregister(t *testing.T) { + t.Run("Unregistered capability callback is no longer called", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredSender, err := registry.Register(capabilityString) + require.NotNil(t, unregisteredSender) + require.NoError(t, err) + + unregisteredSender.Unregister() + + registry.ProcessMessage(customMessage) + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } + }) + + t.Run("Unregister is successful even if set capabilities fails", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + customMessage := &protobufs.CustomMessage{ + Capability: capabilityString, + Type: messageType, + Data: mesageBytes, + } + + client := &mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredSender, err := registry.Register(capabilityString) + require.NotNil(t, unregisteredSender) + require.NoError(t, err) + + client.setCustomCapabilites = func(_ *protobufs.CustomCapabilities) error { + return fmt.Errorf("failed to set capabilities") + } + + unregisteredSender.Unregister() + + registry.ProcessMessage(customMessage) + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } + }) + + t.Run("Does not send if unregistered", func(t *testing.T) { + capabilityString := "io.opentelemetry.teapot" + messageType := "steep" + mesageBytes := []byte("blackTea") + + client := mockCustomCapabilityClient{} + + registry := newCustomCapabilityRegistry(zap.NewNop(), client) + + unregisteredSender, err := registry.Register(capabilityString) + require.NotNil(t, unregisteredSender) + require.NoError(t, err) + + unregisteredSender.Unregister() + + _, err = unregisteredSender.SendMessage(messageType, mesageBytes) + require.ErrorContains(t, err, "capability has already been unregistered") + + select { + case <-unregisteredSender.Message(): + t.Fatalf("Unregistered capability should not be called") + default: // OK + } + }) +} + +type mockCustomCapabilityClient struct { + sendCustomMessage func(message *protobufs.CustomMessage) (chan struct{}, error) + setCustomCapabilites func(customCapabilities *protobufs.CustomCapabilities) error +} + +func (m mockCustomCapabilityClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error { + if m.setCustomCapabilites != nil { + return m.setCustomCapabilites(customCapabilities) + } + return nil +} + +func (m mockCustomCapabilityClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + if m.sendCustomMessage != nil { + return m.sendCustomMessage(message) + } + + return make(chan struct{}), nil +}