diff --git a/.chloggen/feat_opampsupervisor-start-stop-empty-confmap.yaml b/.chloggen/feat_opampsupervisor-start-stop-empty-confmap.yaml new file mode 100644 index 000000000000..2405e0f53480 --- /dev/null +++ b/.chloggen/feat_opampsupervisor-start-stop-empty-confmap.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Skip executing the collector if no config is provided + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33680] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index a6f41243e542..18fcc4c17e3e 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -349,20 +349,10 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { require.Nil(t, s.Start()) defer s.Shutdown() - // Verify the collector is running by checking the metrics endpoint - require.Eventually(t, func() bool { - resp, err := http.DefaultClient.Get("http://localhost:12345") - if err != nil { - t.Logf("Failed agent healthcheck request: %s", err) - return false - } - require.NoError(t, resp.Body.Close()) - if resp.StatusCode >= 300 || resp.StatusCode < 200 { - t.Logf("Got non-2xx status code: %d", resp.StatusCode) - return false - } - return true - }, 3*time.Second, 100*time.Millisecond) + // Verify the collector is not running after 250 ms by checking the healthcheck endpoint + time.Sleep(250 * time.Millisecond) + _, err := http.DefaultClient.Get("http://localhost:12345") + require.ErrorContains(t, err, "connection refused") // Start the server and wait for the supervisor to connect server.start() @@ -1266,6 +1256,95 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) { require.FileExists(t, filepath.Join(storageDir, "effective.yaml")) } +func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { + agentCfgChan := make(chan string, 1) + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + select { + case agentCfgChan <- string(config.Body): + default: + } + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "healthcheck_port", map[string]string{ + "url": server.addr, + "healthcheck_port": "12345", + }) + + require.Nil(t, s.Start()) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + cfg, hash, _, _ := createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + select { + case <-agentCfgChan: + case <-time.After(1 * time.Second): + require.FailNow(t, "timed out waitng for agent to report its initial config") + } + + // Use health check endpoint to determine if the collector is actually running + require.Eventually(t, func() bool { + resp, err := http.DefaultClient.Get("http://localhost:12345") + if err != nil { + t.Logf("Failed agent healthcheck request: %s", err) + return false + } + require.NoError(t, resp.Body.Close()) + if resp.StatusCode >= 300 || resp.StatusCode < 200 { + t.Logf("Got non-2xx status code: %d", resp.StatusCode) + return false + } + return true + }, 3*time.Second, 100*time.Millisecond) + + // Send empty config + emptyHash := sha256.Sum256([]byte{}) + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{}, + }, + ConfigHash: emptyHash[:], + }, + }) + + select { + case <-agentCfgChan: + case <-time.After(1 * time.Second): + require.FailNow(t, "timed out waitng for agent to report its noop config") + } + + // Verify the collector is not running after 250 ms by checking the healthcheck endpoint + time.Sleep(250 * time.Millisecond) + _, err := http.DefaultClient.Get("http://localhost:12345") + require.ErrorContains(t, err, "connection refused") + +} + func findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index e30dfe883471..a45d7b6d38d1 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -166,9 +166,9 @@ agent: ### Operation When OpAMP Server is Unavailable When the supervisor cannot connect to the OpAMP server, the collector will -be run with the last known configuration, or with a "noop" configuration -if no previous configuration is persisted. The supervisor will continually -attempt to reconnect to the OpAMP server with exponential backoff. +be run with the last known configuration if a previous configuration is persisted. +If no previous configuration has been persisted, the collector does not run. +The supervisor will continually attempt to reconnect to the OpAMP server with exponential backoff. ### Executing Collector @@ -204,6 +204,10 @@ Configuration*](https://github.com/open-telemetry/opamp-spec/blob/main/specifica from the OpAMP Backend, merges it with an optional local config file and writes it to the Collector's config file, then restarts the Collector. +If the remote configuration from the OpAMP Backend contains an empty config map, +the collector will be stopped and will not be run again until a non-empty config map +is received from the OpAMP Backend. + In the future once config file watching is implemented the Collector can reload the config without the need for the Supervisor to restart the Collector process. @@ -244,13 +248,13 @@ configuration. To overcome this problem the Supervisor starts the Collector with an "noop" configuration that collects nothing but allows the opamp extension to be started. The "noop" configuration is a single pipeline -with an OTLP receiver that listens on a random port and a debug -exporter, and the opamp extension. The purpose of the "noop" -configuration is to make sure the Collector starts and the opamp -extension communicates with the Supervisor. +with an nop receiver, a nop exporter, and the opamp extension. +The purpose of the "noop" configuration is to make sure the Collector starts +and the opamp extension communicates with the Supervisor. The Collector is stopped +after the AgentDescription is received from the Collector. Once the initial Collector launch is successful and the remote -configuration is received by the Supervisor the Supervisor restarts the +configuration is received by the Supervisor the Supervisor starts the Collector with the new config. The new config is also cached by the Supervisor in a local file, so that subsequent restarts no longer need to start the Collector using the "noop" configuration. Caching of the diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index febcde73ac98..d96a1c8fbc93 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -67,6 +67,17 @@ const ( const maxBufferedCustomMessages = 10 +type configState struct { + // Supervisor-assembled config to be given to the Collector. + mergedConfig string + // true if the server provided configmap was empty + configMapIsEmpty bool +} + +func (c *configState) equal(other *configState) bool { + return other.mergedConfig == c.mergedConfig && other.configMapIsEmpty == c.configMapIsEmpty +} + // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient // to work with an OpAMP Server. type Supervisor struct { @@ -107,8 +118,8 @@ type Supervisor struct { // will listen on for health check requests from the Supervisor. agentHealthCheckEndpoint string - // Supervisor-assembled config to be given to the Collector. - mergedConfig *atomic.Value + // Internal config state for agent use. See the configState struct for more details. + cfgState *atomic.Value // Final effective config of the Collector. effectiveConfig *atomic.Value @@ -143,7 +154,7 @@ func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, erro pidProvider: defaultPIDProvider{}, hasNewConfig: make(chan struct{}, 1), agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentDescription: &atomic.Value{}, doneChan: make(chan struct{}), @@ -793,8 +804,8 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error { } // write the initial merged config to disk - cfg := s.mergedConfig.Load().(string) - if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil { + cfgState := s.cfgState.Load().(*configState) + if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil { s.logger.Error("Failed to write agent config.", zap.Error(err)) } @@ -806,9 +817,11 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error { func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig { cfgStr, ok := s.effectiveConfig.Load().(string) if !ok { - cfgStr, ok = s.mergedConfig.Load().(string) + cfgState, ok := s.cfgState.Load().(*configState) if !ok { cfgStr = "" + } else { + cfgStr = cfgState.mergedConfig } } @@ -870,7 +883,11 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) { var k = koanf.New("::") - if c := config.GetConfig(); c != nil { + configMapIsEmpty := len(config.GetConfig().GetConfigMap()) == 0 + + if !configMapIsEmpty { + c := config.GetConfig() + // Sort to make sure the order of merging is stable. var names []string for name := range c.ConfigMap { @@ -939,11 +956,16 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c } // Check if supervisor's merged config is changed. - newMergedConfig := string(newMergedConfigBytes) + + newConfigState := &configState{ + mergedConfig: string(newMergedConfigBytes), + configMapIsEmpty: configMapIsEmpty, + } + configChanged = false - oldConfig := s.mergedConfig.Swap(newMergedConfig) - if oldConfig == nil || oldConfig.(string) != newMergedConfig { + oldConfigState := s.cfgState.Swap(newConfigState) + if oldConfigState == nil || !oldConfigState.(*configState).equal(newConfigState) { s.logger.Debug("Merged config changed.") configChanged = true } @@ -963,6 +985,12 @@ func (s *Supervisor) handleRestartCommand() error { } func (s *Supervisor) startAgent() { + if s.cfgState.Load().(*configState).configMapIsEmpty { + // Don't start the agent if there is no config to run + s.logger.Info("No config present, not starting agent.") + return + } + err := s.commander.Start(context.Background()) if err != nil { s.logger.Error("Cannot start the agent", zap.Error(err)) @@ -1104,14 +1132,14 @@ func (s *Supervisor) runAgentProcess() { func (s *Supervisor) stopAgentApplyConfig() { s.logger.Debug("Stopping the agent to apply new config") - cfg := s.mergedConfig.Load().(string) + cfgState := s.cfgState.Load().(*configState) err := s.commander.Stop(context.Background()) if err != nil { s.logger.Error("Could not stop agent process", zap.Error(err)) } - if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil { + if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil { s.logger.Error("Failed to write agent config.", zap.Error(err)) } } diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index f0cf3d1517b9..6b1a100b1010 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -104,7 +104,7 @@ func Test_composeEffectiveConfig(t *testing.T) { pidProvider: staticPIDProvider(1234), hasNewConfig: make(chan struct{}, 1), agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", } @@ -159,7 +159,7 @@ service: expectedConfig = bytes.ReplaceAll(expectedConfig, []byte("\r\n"), []byte("\n")) require.True(t, configChanged) - require.Equal(t, string(expectedConfig), s.mergedConfig.Load().(string)) + require.Equal(t, string(expectedConfig), s.cfgState.Load().(*configState).mergedConfig) } func Test_onMessage(t *testing.T) { @@ -176,7 +176,7 @@ func Test_onMessage(t *testing.T) { persistentState: &persistentState{InstanceID: initialID}, agentDescription: agentDesc, agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())), @@ -205,7 +205,7 @@ func Test_onMessage(t *testing.T) { persistentState: &persistentState{InstanceID: testUUID}, agentDescription: agentDesc, agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", } @@ -251,7 +251,7 @@ func Test_onMessage(t *testing.T) { hasNewConfig: make(chan struct{}, 1), persistentState: &persistentState{InstanceID: testUUID}, agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentConn: agentConnAtomic, agentHealthCheckEndpoint: "localhost:8000", @@ -332,7 +332,7 @@ func Test_onMessage(t *testing.T) { persistentState: &persistentState{InstanceID: initialID}, agentDescription: agentDesc, agentConfigOwnMetricsSection: &atomic.Value{}, - mergedConfig: &atomic.Value{}, + cfgState: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())), @@ -358,10 +358,11 @@ func Test_onMessage(t *testing.T) { }) require.Equal(t, newID, s.persistentState.InstanceID) - t.Log(s.mergedConfig.Load()) - require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics") - require.Contains(t, s.mergedConfig.Load(), newID.String()) - require.Contains(t, s.mergedConfig.Load(), "runtime.type: test") + t.Log(s.cfgState.Load()) + mergedCfg := s.cfgState.Load().(*configState).mergedConfig + require.Contains(t, mergedCfg, "prometheus/own_metrics") + require.Contains(t, mergedCfg, newID.String()) + require.Contains(t, mergedCfg, "runtime.type: test") }) }