From f6b505848f501266e646b6935a8904cc4873bfce Mon Sep 17 00:00:00 2001 From: Ian Adams Date: Fri, 10 Jan 2025 12:01:35 -0500 Subject: [PATCH] Add implementation of new AvailableComponents message --- client/client.go | 5 +++ client/httpclient.go | 5 +++ client/internal/clientcommon.go | 49 ++++++++++++++++++++++++++-- client/internal/clientstate.go | 39 +++++++++++++++++----- client/internal/receivedprocessor.go | 10 ++++++ client/types/callbacks.go | 3 ++ client/wsclient.go | 5 +++ client/wsclient_test.go | 2 +- 8 files changed, 106 insertions(+), 12 deletions(-) diff --git a/client/client.go b/client/client.go index afc08315..5917bd3a 100644 --- a/client/client.go +++ b/client/client.go @@ -134,4 +134,9 @@ type OpAMPClient interface { // If no error is returned, the channel returned will be closed after the specified // message is sent. SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + + // SetAvailableComponents modifies the set of components that are available for configuration + // on the agent. + // TODO more documentation + SetAvailableComponents(components *protobufs.AvailableComponents) error } diff --git a/client/httpclient.go b/client/httpclient.go index 92259d59..8d32797d 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -120,6 +120,11 @@ func (c *httpClient) SendCustomMessage(message *protobufs.CustomMessage) (messag return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *httpClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func (c *httpClient) runUntilStopped(ctx context.Context) { // Start the HTTP sender. This will make request/responses with retries for // failures and will wait with configured polling interval if there is nothing diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 283e3b01..47ed75ce 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -20,10 +20,12 @@ var ( ErrReportsRemoteConfigNotSet = errors.New("ReportsRemoteConfig capability is not set") ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") + ErrAvailableComponentsMissing = errors.New("AvailableComponents is nil") - errAlreadyStarted = errors.New("already started") - errCannotStopNotStarted = errors.New("cannot stop because not started") - errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errAlreadyStarted = errors.New("already started") + errCannotStopNotStarted = errors.New("cannot stop because not started") + errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") + errReportsAvailableComponentsNotSet = errors.New("ReportsAvailableComponents capability is not set") ) // ClientCommon contains the OpAMP logic that is common between WebSocket and @@ -88,6 +90,10 @@ func (c *ClientCommon) PrepareStart( return ErrHealthMissing } + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents != 0 && c.ClientSyncedState.AvailableComponents() == nil { + return ErrAvailableComponentsMissing + } + // Prepare remote config status. if settings.RemoteConfigStatus == nil { // RemoteConfigStatus is not provided. Start with empty. @@ -212,6 +218,12 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { return err } + // initially, do not send the full component state - just send the hash. + // full state is available on request from the server using the corresponding ServerToAgent flag + availableComponents := &protobufs.AvailableComponents{ + Hash: c.ClientSyncedState.AvailableComponents().GetHash(), + } + c.sender.NextMessage().Update( func(msg *protobufs.AgentToServer) { msg.AgentDescription = c.ClientSyncedState.AgentDescription() @@ -221,6 +233,7 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error { msg.Capabilities = uint64(c.Capabilities) msg.CustomCapabilities = c.ClientSyncedState.CustomCapabilities() msg.Flags = c.ClientSyncedState.Flags() + msg.AvailableComponents = availableComponents }, ) return nil @@ -433,3 +446,33 @@ func (c *ClientCommon) SendCustomMessage(message *protobufs.CustomMessage) (mess return sendingChan, nil } + +// SetAvailableComponents sends a message to the server with the available components for the agent +func (c *ClientCommon) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if c.Capabilities&protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents == 0 { + return errReportsAvailableComponentsNotSet + } + + if len(components.Hash) == 0 { + return errNoAvailableComponentHash + } + + // implement agent status compression, don't send the message if it hasn't changed from the previous message + availableComponentsChanged := !proto.Equal(c.ClientSyncedState.AvailableComponents(), components) + + if availableComponentsChanged { + if err := c.ClientSyncedState.SetAvailableComponents(components); err != nil { + return err + } + + c.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = c.ClientSyncedState.AvailableComponents() + }, + ) + + c.sender.ScheduleSend() + } + + return nil +} diff --git a/client/internal/clientstate.go b/client/internal/clientstate.go index 93250c9f..fc25866b 100644 --- a/client/internal/clientstate.go +++ b/client/internal/clientstate.go @@ -14,11 +14,13 @@ var ( errPackageStatusesMissing = errors.New("PackageStatuses is not set") errServerProvidedAllPackagesHashNil = errors.New("ServerProvidedAllPackagesHash is nil") errCustomCapabilitiesMissing = errors.New("CustomCapabilities is not set") + errAvailableComponentsMissing = errors.New("AvailableComponents is not set") + errNoAvailableComponentHash = errors.New("AvailableComponents.Hash is empty") ) // ClientSyncedState stores the state of the Agent messages that the OpAMP Client needs to -// have access to synchronize to the Server. Six messages can be stored in this store: -// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities and Flags. +// have access to synchronize to the Server. Seven messages can be stored in this store: +// AgentDescription, ComponentHealth, RemoteConfigStatus, PackageStatuses, CustomCapabilities, AvailableComponents and Flags. // // See OpAMP spec for more details on how status reporting works: // https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#status-reporting @@ -34,12 +36,13 @@ var ( type ClientSyncedState struct { mutex sync.Mutex - agentDescription *protobufs.AgentDescription - health *protobufs.ComponentHealth - remoteConfigStatus *protobufs.RemoteConfigStatus - packageStatuses *protobufs.PackageStatuses - customCapabilities *protobufs.CustomCapabilities - flags protobufs.AgentToServerFlags + agentDescription *protobufs.AgentDescription + health *protobufs.ComponentHealth + remoteConfigStatus *protobufs.RemoteConfigStatus + packageStatuses *protobufs.PackageStatuses + customCapabilities *protobufs.CustomCapabilities + availableComponents *protobufs.AvailableComponents + flags protobufs.AgentToServerFlags } func (s *ClientSyncedState) AgentDescription() *protobufs.AgentDescription { @@ -72,6 +75,12 @@ func (s *ClientSyncedState) CustomCapabilities() *protobufs.CustomCapabilities { return s.customCapabilities } +func (s *ClientSyncedState) AvailableComponents() *protobufs.AvailableComponents { + defer s.mutex.Unlock() + s.mutex.Lock() + return s.availableComponents +} + func (s *ClientSyncedState) Flags() uint64 { defer s.mutex.Unlock() s.mutex.Lock() @@ -176,6 +185,20 @@ func (s *ClientSyncedState) HasCustomCapability(capability string) bool { return false } +func (s *ClientSyncedState) SetAvailableComponents(components *protobufs.AvailableComponents) error { + if components == nil { + return errAvailableComponentsMissing + } + + clone := proto.Clone(components).(*protobufs.AvailableComponents) + + defer s.mutex.Unlock() + s.mutex.Lock() + s.availableComponents = clone + + return nil +} + // SetFlags sets the flags in the state. func (s *ClientSyncedState) SetFlags(flags protobufs.AgentToServerFlags) { defer s.mutex.Unlock() diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index aee05a81..55f2ee55 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -197,6 +197,7 @@ func (r *receivedProcessor) rcvFlags( msg.PackageStatuses = r.clientSyncedState.PackageStatuses() msg.CustomCapabilities = r.clientSyncedState.CustomCapabilities() msg.Flags = r.clientSyncedState.Flags() + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() // The logic for EffectiveConfig is similar to the previous 6 sub-messages however // the EffectiveConfig is fetched using GetEffectiveConfig instead of @@ -207,6 +208,15 @@ func (r *receivedProcessor) rcvFlags( scheduleSend = true } + if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportAvailableComponents != 0 { + r.sender.NextMessage().Update( + func(msg *protobufs.AgentToServer) { + msg.AvailableComponents = r.clientSyncedState.AvailableComponents() + }, + ) + scheduleSend = true + } + return scheduleSend, nil } diff --git a/client/types/callbacks.go b/client/types/callbacks.go index 48d5f832..e12cda58 100644 --- a/client/types/callbacks.go +++ b/client/types/callbacks.go @@ -45,6 +45,9 @@ type MessageData struct { // CustomMessage contains a custom message sent by the server. CustomMessage *protobufs.CustomMessage + + // Flags contains any flags sent by the server. + Flags *protobufs.AgentToServerFlags } // Callbacks contains functions that are executed when the client encounters diff --git a/client/wsclient.go b/client/wsclient.go index f19d8ab4..fdacdd4b 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -157,6 +157,11 @@ func (c *wsClient) SendCustomMessage(message *protobufs.CustomMessage) (messageS return c.common.SendCustomMessage(message) } +// SetAvailableComponents implements OpAMPClient.SetAvailableComponents +func (c *wsClient) SetAvailableComponents(components *protobufs.AvailableComponents) error { + return c.common.SetAvailableComponents(components) +} + func viaReq(resps []*http.Response) []*http.Request { reqs := make([]*http.Request, 0, len(resps)) for _, resp := range resps { diff --git a/client/wsclient_test.go b/client/wsclient_test.go index 436ceb55..62be0b43 100644 --- a/client/wsclient_test.go +++ b/client/wsclient_test.go @@ -257,7 +257,7 @@ func TestVerifyWSCompress(t *testing.T) { remoteCfg := &protobufs.AgentRemoteConfig{ Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ - "": &protobufs.AgentConfigFile{ + "": { Body: uncompressedCfg, }, },