Skip to content

Commit

Permalink
Add implementation of new AvailableComponents message
Browse files Browse the repository at this point in the history
  • Loading branch information
mrsillydog committed Jan 10, 2025
1 parent e6fac32 commit f6b5058
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 12 deletions.
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,9 @@ type OpAMPClient interface {
// If no error is returned, the channel returned will be closed after the specified
// message is sent.
SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error)

// SetAvailableComponents modifies the set of components that are available for configuration
// on the agent.
// TODO more documentation
SetAvailableComponents(components *protobufs.AvailableComponents) error
}
5 changes: 5 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)

Check warning on line 125 in client/httpclient.go

View check run for this annotation

Codecov / codecov/patch

client/httpclient.go#L124-L125

Added lines #L124 - L125 were not covered by tests
}

func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
Expand Down
49 changes: 46 additions & 3 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ var (
ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")
ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand Down Expand Up @@ -88,6 +90,10 @@ func (c *ClientCommon) PrepareStart(
return ErrHealthMissing
}

if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil {
return ErrAvailableComponentsMissing
}

Check warning on line 95 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L94-L95

Added lines #L94 - L95 were not covered by tests

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand Down Expand Up @@ -212,6 +218,12 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
return err
}

// initially, do not send the full component state - just send the hash.
// full state is available on request from the server using the corresponding ServerToAgent flag
availableComponents := &protobufs.AvailableComponents{
Hash: c.ClientSyncedState.AvailableComponents().GetHash(),
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AgentDescription = c.ClientSyncedState.AgentDescription()
Expand All @@ -221,6 +233,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.Capabilities = uint64(c.Capabilities)
msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities()
msg.Flags = c.ClientSyncedState.Flags()
msg.AvailableComponents = availableComponents
},
)
return nil
Expand Down Expand Up @@ -433,3 +446,33 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess

return sendingChan, nil
}

// SetAvailableComponents sends a message to the server with the available components for the agent
func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 {
return errReportsAvailableComponentsNotSet
}

Check warning on line 454 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L451-L454

Added lines #L451 - L454 were not covered by tests

if len(components.Hash) == 0 {
return errNoAvailableComponentHash
}

Check warning on line 458 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L456-L458

Added lines #L456 - L458 were not covered by tests

// implement agent status compression, don't send the message if it hasn't changed from the previous message
availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components)

if availableComponentsChanged {
if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil {
return err
}

Check warning on line 466 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L461-L466

Added lines #L461 - L466 were not covered by tests

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = c.ClientSyncedState.AvailableComponents()
},

Check warning on line 471 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L468-L471

Added lines #L468 - L471 were not covered by tests
)

c.sender.ScheduleSend()

Check warning on line 474 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L474

Added line #L474 was not covered by tests
}

return nil

Check warning on line 477 in client/internal/clientcommon.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientcommon.go#L477

Added line #L477 was not covered by tests
}
39 changes: 31 additions & 8 deletions client/internal/clientstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ var (
errPackageStatusesMissing = errors.New("PackageStatuses is not set")
errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil")
errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set")
errAvailableComponentsMissing = errors.New("AvailableComponents is not set")
errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty")
)

// ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to
// have access to synchronize to the Server. Six messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags.
// have access to synchronize to the Server. Seven messages can be stored in this store:
// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags.
//
// See OpAMP spec for more details on how status reporting works:
// https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting
Expand All @@ -34,12 +36,13 @@ var (
type ClientSyncedState struct {
mutex sync.Mutex

agentDescription *protobufs.AgentDescription
health *protobufs.ComponentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
flags protobufs.AgentToServerFlags
agentDescription *protobufs.AgentDescription
health *protobufs.ComponentHealth
remoteConfigStatus *protobufs.RemoteConfigStatus
packageStatuses *protobufs.PackageStatuses
customCapabilities *protobufs.CustomCapabilities
availableComponents *protobufs.AvailableComponents
flags protobufs.AgentToServerFlags
}

func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription {
Expand Down Expand Up @@ -72,6 +75,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities {
return s.customCapabilities
}

func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents {
defer s.mutex.Unlock()
s.mutex.Lock()
return s.availableComponents
}

func (s *ClientSyncedState) Flags() uint64 {
defer s.mutex.Unlock()
s.mutex.Lock()
Expand Down Expand Up @@ -176,6 +185,20 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool {
return false
}

func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error {
if components == nil {
return errAvailableComponentsMissing
}

Check warning on line 191 in client/internal/clientstate.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientstate.go#L188-L191

Added lines #L188 - L191 were not covered by tests

clone := proto.Clone(components).(*protobufs.AvailableComponents)

defer s.mutex.Unlock()
s.mutex.Lock()
s.availableComponents = clone

return nil

Check warning on line 199 in client/internal/clientstate.go

View check run for this annotation

Codecov / codecov/patch

client/internal/clientstate.go#L193-L199

Added lines #L193 - L199 were not covered by tests
}

// SetFlags sets the flags in the state.
func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) {
defer s.mutex.Unlock()
Expand Down
10 changes: 10 additions & 0 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags(
msg.PackageStatuses = r.clientSyncedState.PackageStatuses()
msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities()
msg.Flags = r.clientSyncedState.Flags()
msg.AvailableComponents = r.clientSyncedState.AvailableComponents()

// The logic for EffectiveConfig is similar to the previous 6 sub-messages however
// the EffectiveConfig is fetched using GetEffectiveConfig instead of
Expand All @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags(
scheduleSend = true
}

if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 {
r.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
msg.AvailableComponents = r.clientSyncedState.AvailableComponents()
},

Check warning on line 215 in client/internal/receivedprocessor.go

View check run for this annotation

Codecov / codecov/patch

client/internal/receivedprocessor.go#L212-L215

Added lines #L212 - L215 were not covered by tests
)
scheduleSend = true

Check warning on line 217 in client/internal/receivedprocessor.go

View check run for this annotation

Codecov / codecov/patch

client/internal/receivedprocessor.go#L217

Added line #L217 was not covered by tests
}

return scheduleSend, nil
}

Expand Down
3 changes: 3 additions & 0 deletions client/types/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type MessageData struct {

// CustomMessage contains a custom message sent by the server.
CustomMessage *protobufs.CustomMessage

// Flags contains any flags sent by the server.
Flags *protobufs.AgentToServerFlags
}

// Callbacks contains functions that are executed when the client encounters
Expand Down
5 changes: 5 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS
return c.common.SendCustomMessage(message)
}

// SetAvailableComponents implements OpAMPClient.SetAvailableComponents
func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error {
return c.common.SetAvailableComponents(components)

Check warning on line 162 in client/wsclient.go

View check run for this annotation

Codecov / codecov/patch

client/wsclient.go#L161-L162

Added lines #L161 - L162 were not covered by tests
}

func viaReq(resps []*http.Response) []*http.Request {
reqs := make([]*http.Request, 0, len(resps))
for _, resp := range resps {
Expand Down
2 changes: 1 addition & 1 deletion client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestVerifyWSCompress(t *testing.T) {
remoteCfg := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": &protobufs.AgentConfigFile{
"": {
Body: uncompressedCfg,
},
},
Expand Down

0 comments on commit f6b5058

Please sign in to comment.