diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 4ec5034f785e..d8477815c07a 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -1,14 +1,14 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -//go:build e2e - package main import ( + "bufio" "bytes" "context" "crypto/sha256" + "encoding/json" "errors" "fmt" "io" @@ -40,10 +40,12 @@ import ( "github.com/stretchr/testify/require" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/telemetry" ) var _ clientTypes.Logger = testLogger{} @@ -1354,6 +1356,78 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { } +type LogEntry struct { + Level string `json:"level"` +} + +func TestSupervisorInfoLoggingLevel(t *testing.T) { + storageDir := t.TempDir() + remoteCfgFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat") + + collectorCfg, hash, _, _ := createSimplePipelineCollectorConf(t) + remoteCfgProto := &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: collectorCfg.Bytes()}, + }, + }, + ConfigHash: hash, + } + marshalledRemoteCfg, err := proto.Marshal(remoteCfgProto) + require.NoError(t, err) + require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0600)) + + connected := atomic.Bool{} + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + }) + defer server.shutdown() + + supervisorLogFilePath := filepath.Join(storageDir, "supervisor_log.log") + cfgFile := getSupervisorConfig(t, "logging", map[string]string{ + "url": server.addr, + "storage_dir": storageDir, + "log_level": "0", + "log_file": supervisorLogFilePath, + }) + + cfg, err := config.Load(cfgFile.Name()) + require.NoError(t, err) + logger, err := telemetry.NewLogger(cfg.Telemetry.Logs) + require.NoError(t, err) + + s, err := supervisor.NewSupervisor(logger, cfg) + require.NoError(t, err) + require.Nil(t, s.Start()) + + // Start the server and wait for the supervisor to connect + server.start() + waitForSupervisorConnection(server.supervisorConnected, true) + require.True(t, connected.Load(), "Supervisor failed to connect") + + s.Shutdown() + + // Read from log file checking for Info level logs + logFile, err := os.Open(supervisorLogFilePath) + require.NoError(t, err) + defer logFile.Close() + + scanner := bufio.NewScanner(logFile) + + for scanner.Scan() { + line := scanner.Bytes() + var log LogEntry + err := json.Unmarshal(line, &log) + require.NoError(t, err) + + level, err := zapcore.ParseLevel(log.Level) + require.NoError(t, err) + require.GreaterOrEqual(t, level, zapcore.InfoLevel) + } +} + func findRandomPort() (int, error) { l, err := net.Listen("tcp", "localhost:0") diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 3a7f384db3f5..16d584abeb37 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -237,25 +237,3 @@ func DefaultSupervisor() Supervisor { }, } } - -func LoadConfig(configFile string) (Supervisor, error) { - if configFile == "" { - return Supervisor{}, errors.New("path to config file cannot be empty") - } - - k := koanf.New("::") - if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil { - return Supervisor{}, err - } - - decodeConf := koanf.UnmarshalConf{ - Tag: "mapstructure", - } - - cfg := DefaultSupervisor() - if err := k.UnmarshalWithConf("", &cfg, decodeConf); err != nil { - return Supervisor{}, fmt.Errorf("cannot parse %v: %w", configFile, err) - } - - return cfg, nil -} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 0c5f85abc6fb..8d683e5b09b4 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -150,7 +150,6 @@ type Supervisor struct { func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) { s := &Supervisor{ - config: cfg, logger: logger, pidProvider: defaultPIDProvider{}, hasNewConfig: make(chan struct{}, 1), @@ -200,7 +199,7 @@ func (s *Supervisor) Start() error { s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort) - s.logger.Debug("Supervisor starting", + s.logger.Info("Supervisor starting", zap.String("id", s.persistentState.InstanceID.String())) err = s.loadAndWriteInitialMergedConfig() diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml new file mode 100644 index 000000000000..dc6153dfb86c --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml @@ -0,0 +1,21 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + accepts_restart_command: true + +storage: + directory: '{{.storage_dir}}' + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + +telemetry: + logs: + level: {{.log_level}} # info level logs + output_paths: ['{{.log_file}}']