Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/opamp]: Custom Message Support #32281

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5182859
custom message interface
BinaryFissionGames Mar 27, 2024
b5fd947
opamp custom messages prototype
BinaryFissionGames Apr 1, 2024
3901617
finish registry + tests
BinaryFissionGames Apr 9, 2024
b83d7c1
ensure extension is a valid registry
BinaryFissionGames Apr 9, 2024
0c1fe09
add license
BinaryFissionGames Apr 9, 2024
8b5f549
add check for unregistered in Unregister
BinaryFissionGames Apr 9, 2024
e204bcf
process custom messages with the registry when received.
BinaryFissionGames Apr 10, 2024
47f1987
add chlog
BinaryFissionGames Apr 10, 2024
e277407
add readme section about custom messages
BinaryFissionGames Apr 10, 2024
5ece376
fix lint errors
BinaryFissionGames Apr 10, 2024
4a2d18a
make goporto
BinaryFissionGames Apr 15, 2024
af558b1
fix incorrect tracking issue
BinaryFissionGames Apr 16, 2024
df759d4
Rework interface based on feedback
BinaryFissionGames Apr 17, 2024
c31315c
callback -> listener
BinaryFissionGames Apr 17, 2024
0aa8fba
Revert "callback -> listener"
BinaryFissionGames Apr 17, 2024
9f7de59
Revert "Rework interface based on feedback"
BinaryFissionGames Apr 17, 2024
bcb17e3
refactor to a loose unregister function
BinaryFissionGames Apr 17, 2024
d506ea7
comment/naming cleanup
BinaryFissionGames Apr 17, 2024
07597d3
Refactor into suggested interface
BinaryFissionGames Apr 23, 2024
448cf87
fix typo in readme example
BinaryFissionGames Apr 23, 2024
1847e41
fix spelling error in comment
BinaryFissionGames Apr 23, 2024
2629cf8
Fix readme indentation
BinaryFissionGames Apr 24, 2024
ff42ce1
Fix comments, unexport withMaxQueuedMessages
BinaryFissionGames Apr 24, 2024
281ee8d
add implementation assertions
BinaryFissionGames Apr 24, 2024
0e3d8b1
rename customeMessageSender -> customMessageHandler
BinaryFissionGames Apr 24, 2024
1f8921d
Check channels are empty in multi capability test
BinaryFissionGames Apr 24, 2024
5166988
newCustomMessageSender -> newCustomMessageHandler
BinaryFissionGames Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .chloggen/feat_opamp-extension-custom-messages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added support for other components to register custom capabilities and receive custom messages from an opamp extension"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32021]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
24 changes: 24 additions & 0 deletions extension/opampextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,30 @@ extensions:
endpoint: wss://127.0.0.1:4320/v1/opamp
```

## Custom Messages

Other components may use a configured OpAMP extension to send and receive custom messages to and from an OpAMP server. Components may use the provided `components.Host` from the Start method in order to get a handle to the registry:

```go
func Start(_ context.Context, host component.Host) error {
ext, ok := host.GetExtensions()[opampExtensionID]
if !ok {
return fmt.Errorf("opamp extension %q does not exist", opampExtensionID)
}
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved

registry, ok := ext.(opampextension.CustomCapabilityRegistry)
if !ok {
return fmt.Errorf("extension %q is not a custom message registry", opampExtensionID)
}

// You can now use registry.Register to register a custom capability

return nil
}
```

See the [custom_messages.go](./custom_messages.go) for more information on the custom message API.

## Status

This OpenTelemetry OpAMP agent extension is intended to support the [OpAMP
Expand Down
62 changes: 62 additions & 0 deletions extension/opampextension/custom_messages.go
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension"

import "github.com/open-telemetry/opamp-go/protobufs"

// customCapabilityRegisterOptions represents extra options that can be use in CustomCapabilityRegistry.Register
type customCapabilityRegisterOptions struct {
MaxQueuedMessages int
}

// defaultCustomCapabilityRegisterOptions returns the default options for CustomCapabilityRegisterOptions
func defaultCustomCapabilityRegisterOptions() *customCapabilityRegisterOptions {
return &customCapabilityRegisterOptions{
MaxQueuedMessages: 10,
}
}

// CustomCapabilityRegisterOption represent a single option for CustomCapabilityRegistry.Register
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
type CustomCapabilityRegisterOption func(*customCapabilityRegisterOptions)

// withMaxQueuedMessages overrides the maximum number of queued messages. If a message is received while
// MaxQueuedMessages messages are already queued to be processed, the message is dropped.
func withMaxQueuedMessages(maxQueuedMessages int) CustomCapabilityRegisterOption {
return func(c *customCapabilityRegisterOptions) {
c.MaxQueuedMessages = maxQueuedMessages
}
}

// CustomCapabilityRegistry allows for registering a custom capability that can receive custom messages.
type CustomCapabilityRegistry interface {
// Register registers a new custom capability.
// It returns a CustomMessageHandler, which can be used to send and receive
// messages to/from the OpAMP server.
Register(capability string, opts ...CustomCapabilityRegisterOption) (handler CustomCapabilityHandler, err error)
}

// CustomCapabilityHandler represents a handler for a custom capability.
// This can be used to send and receive custom messages to/from an OpAMP server.
// It can also be used to unregister the custom capability when it is no longer supported.
type CustomCapabilityHandler interface {
// Message returns a channel that can be used to receive custom messages sent from the OpAMP server.
Message() <-chan *protobufs.CustomMessage

// SendMessage sends a custom message to the OpAMP server.
//
// Only one message can be sent at a time. If SendCustomMessage has been already called
// for any capability, and the message is still pending (in progress)
// then subsequent calls to SendCustomMessage will return github.com/open-telemetry/opamp-go/types.ErrCustomMessagePending
// and a channel that will be closed when the pending message is sent.
// To ensure that the previous send is complete and it is safe to send another CustomMessage,
// the caller should wait for the returned channel to be closed before attempting to send another custom message.
//
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error)

// Unregister unregisters the custom capability. After this method returns, SendMessage will always return an error,
// and Message will no longer receive further custom messages.
Unregister()
}
42 changes: 27 additions & 15 deletions extension/opampextension/opamp_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ type opampAgent struct {
agentDescription *protobufs.AgentDescription

opampClient client.OpAMPClient

customCapabilityRegistry *customCapabilityRegistry
}

var _ CustomCapabilityRegistry = (*opampAgent)(nil)

func (o *opampAgent) Start(ctx context.Context, _ component.Host) error {
header := http.Header{}
for k, v := range o.cfg.Server.GetHeaders() {
Expand Down Expand Up @@ -121,6 +125,10 @@ func (o *opampAgent) NotifyConfig(ctx context.Context, conf *confmap.Conf) error
return nil
}

func (o *opampAgent) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) {
return o.customCapabilityRegistry.Register(capability, opts...)
}

func (o *opampAgent) updateEffectiveConfig(conf *confmap.Conf) {
o.eclk.Lock()
defer o.eclk.Unlock()
Expand Down Expand Up @@ -162,14 +170,16 @@ func newOpampAgent(cfg *Config, logger *zap.Logger, build component.BuildInfo, r
}
}

opampClient := cfg.Server.GetClient(logger)
agent := &opampAgent{
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: cfg.Server.GetClient(logger),
cfg: cfg,
logger: logger,
agentType: agentType,
agentVersion: agentVersion,
instanceID: uid,
capabilities: cfg.Capabilities,
opampClient: opampClient,
customCapabilityRegistry: newCustomCapabilityRegistry(logger, opampClient),
}

return agent, nil
Expand Down Expand Up @@ -256,15 +266,17 @@ func (o *opampAgent) composeEffectiveConfig() *protobufs.EffectiveConfig {
}

func (o *opampAgent) onMessage(_ context.Context, msg *types.MessageData) {
if msg.AgentIdentification == nil {
return
}
if msg.AgentIdentification != nil {
instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid)
if err != nil {
o.logger.Error("Failed to parse a new agent identity", zap.Error(err))
return
}

instanceID, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid)
if err != nil {
o.logger.Error("Failed to parse a new agent identity", zap.Error(err))
return
o.updateAgentIdentity(instanceID)
}

o.updateAgentIdentity(instanceID)
if msg.CustomMessage != nil {
o.customCapabilityRegistry.ProcessMessage(msg.CustomMessage)
}
}
205 changes: 205 additions & 0 deletions extension/opampextension/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package opampextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension"

import (
"container/list"
"errors"
"fmt"
"slices"
"sync"

"github.com/open-telemetry/opamp-go/protobufs"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// customCapabilityClient is a subset of OpAMP client containing only the methods needed for the customCapabilityRegistry.
type customCapabilityClient interface {
SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)
}

type customCapabilityRegistry struct {
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
mux *sync.Mutex
capabilityToMsgChannels map[string]*list.List
client customCapabilityClient
logger *zap.Logger
}

var _ CustomCapabilityRegistry = (*customCapabilityRegistry)(nil)

func newCustomCapabilityRegistry(logger *zap.Logger, client customCapabilityClient) *customCapabilityRegistry {
return &customCapabilityRegistry{
mux: &sync.Mutex{},
capabilityToMsgChannels: make(map[string]*list.List),
client: client,
logger: logger,
}
}

// Register implements CustomCapabilityRegistry.Register
func (cr *customCapabilityRegistry) Register(capability string, opts ...CustomCapabilityRegisterOption) (CustomCapabilityHandler, error) {
optsStruct := defaultCustomCapabilityRegisterOptions()
for _, opt := range opts {
opt(optsStruct)
}

cr.mux.Lock()
defer cr.mux.Unlock()

capabilities := cr.capabilities()
if !slices.Contains(capabilities, capability) {
capabilities = append(capabilities, capability)
}

err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
return nil, fmt.Errorf("set custom capabilities: %w", err)
}

capabilityList := cr.capabilityToMsgChannels[capability]
if capabilityList == nil {
capabilityList = list.New()
cr.capabilityToMsgChannels[capability] = capabilityList
}

msgChan := make(chan *protobufs.CustomMessage, optsStruct.MaxQueuedMessages)
callbackElem := capabilityList.PushBack(msgChan)

unregisterFunc := cr.removeCapabilityFunc(capability, callbackElem)
sender := newCustomMessageHandler(cr, cr.client, capability, msgChan, unregisterFunc)

return sender, nil
}

// ProcessMessage processes a custom message, asynchronously broadcasting it to all registered capability handlers for
// the messages capability.
func (cr customCapabilityRegistry) ProcessMessage(cm *protobufs.CustomMessage) {
cr.mux.Lock()
defer cr.mux.Unlock()

msgChannels, ok := cr.capabilityToMsgChannels[cm.Capability]
if !ok {
return
}

for node := msgChannels.Front(); node != nil; node = node.Next() {
msgChan, ok := node.Value.(chan *protobufs.CustomMessage)
if !ok {
continue
}

// If the channel is full, we will skip sending the message to the receiver.
// We do this because we don't want a misbehaving component to be able to
// block the opamp extension, or block other components from receiving messages.
select {
case msgChan <- cm:
default:
}
}
}

// removeCapabilityFunc returns a func that removes the custom capability with the given msg channel list element and sender,
// then recalculates and sets the list of custom capabilities on the OpAMP client.
func (cr *customCapabilityRegistry) removeCapabilityFunc(capability string, callbackElement *list.Element) func() {
return func() {
cr.mux.Lock()
defer cr.mux.Unlock()

msgChanList := cr.capabilityToMsgChannels[capability]
msgChanList.Remove(callbackElement)

if msgChanList.Front() == nil {
// Since there are no more callbacks for this capability,
// this capability is no longer supported
delete(cr.capabilityToMsgChannels, capability)
}

capabilities := cr.capabilities()
err := cr.client.SetCustomCapabilities(&protobufs.CustomCapabilities{
Capabilities: capabilities,
})
if err != nil {
// It's OK if we couldn't actually remove the capability, it just means we won't
// notify the server properly, and the server may send us messages that we have no associated callbacks for.
cr.logger.Error("Failed to set new capabilities", zap.Error(err))
}
}
}

// capabilities gives the current set of custom capabilities with at least one
// callback registered.
func (cr *customCapabilityRegistry) capabilities() []string {
return maps.Keys(cr.capabilityToMsgChannels)
}

type customMessageHandler struct {
// unregisteredMux protects unregistered, and makes sure that a message cannot be sent
// on an unregistered capability.
unregisteredMux *sync.Mutex

capability string
opampClient customCapabilityClient
registry *customCapabilityRegistry
sendChan <-chan *protobufs.CustomMessage
unregisterCapabilityFunc func()

unregistered bool
}

var _ CustomCapabilityHandler = (*customMessageHandler)(nil)

func newCustomMessageHandler(
registry *customCapabilityRegistry,
opampClient customCapabilityClient,
capability string,
sendChan <-chan *protobufs.CustomMessage,
unregisterCapabilityFunc func(),
) *customMessageHandler {
return &customMessageHandler{
unregisteredMux: &sync.Mutex{},

capability: capability,
opampClient: opampClient,
registry: registry,
sendChan: sendChan,
unregisterCapabilityFunc: unregisterCapabilityFunc,
}
}

// Message implements CustomCapabilityHandler.Message
func (c *customMessageHandler) Message() <-chan *protobufs.CustomMessage {
return c.sendChan
}

// SendMessage implements CustomCapabilityHandler.SendMessage
func (c *customMessageHandler) SendMessage(messageType string, message []byte) (messageSendingChannel chan struct{}, err error) {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

if c.unregistered {
return nil, errors.New("capability has already been unregistered")
}

cm := &protobufs.CustomMessage{
Capability: c.capability,
Type: messageType,
Data: message,
}

return c.opampClient.SendCustomMessage(cm)
}

// Unregister implements CustomCapabilityHandler.Unregister
func (c *customMessageHandler) Unregister() {
c.unregisteredMux.Lock()
defer c.unregisteredMux.Unlock()

c.unregistered = true

c.unregisterCapabilityFunc()
}
Loading