diff --git a/.gitignore b/.gitignore index fe3a14592136..78e09df93481 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ *beat/logs *beat/data x-pack/functionbeat/pkg +x-pack/elastic-agent/data +x-pack/elastic-agent/pkg/agent/operation/tests/downloads # Files .DS_Store @@ -25,6 +27,9 @@ mage_output_file.go x-pack/functionbeat/*/fields.yml x-pack/functionbeat/provider/*/functionbeat-* x-pack/dockerlogbeat/temproot.tar +x-pack/elastic-agent/elastic_agent +x-pack/elastic-agent/fleet.yml +x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable # Editor swap files *.swp diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index eb58d36a4549..d962207463a1 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -462,7 +462,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) // Launch config manager - b.ConfigManager.Start() + b.ConfigManager.Start(beater.Stop) defer b.ConfigManager.Stop() return beater.Run(&b.Beat) diff --git a/libbeat/management/management.go b/libbeat/management/management.go index dc793fafe2df..690c3dba7f74 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -40,7 +40,7 @@ type ConfigManager interface { Enabled() bool // Start the config manager - Start() + Start(func()) // Stop the config manager Stop() @@ -98,6 +98,6 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, err } func (nilManager) Enabled() bool { return false } -func (nilManager) Start() {} +func (nilManager) Start(_ func()) {} func (nilManager) Stop() {} func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil } diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 851cf366f06a..ae76bb8a878d 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -70,3 +70,4 @@ - Change monitoring defaults for agent {pull}18927[18927] - Agent verifies packages before using them {pull}18876[18876] - Change stream.* to dataset.* fields {pull}18967[18967] +- Agent now runs the GRPC server and spawned application connect by to Agent {pull}18973[18973] diff --git a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl index 58a82bbd3c2d..10a78b5e0521 100644 --- a/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl @@ -97,13 +97,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index 15582908fe7b..5086a5fa253b 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -89,13 +89,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl index fc7edf734133..a4effcf24f8a 100644 --- a/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/elastic-agent.docker.yml.tmpl @@ -89,13 +89,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/elastic-agent.docker.yml b/x-pack/elastic-agent/elastic-agent.docker.yml index fc7edf734133..a4effcf24f8a 100644 --- a/x-pack/elastic-agent/elastic-agent.docker.yml +++ b/x-pack/elastic-agent/elastic-agent.docker.yml @@ -89,13 +89,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index ae06f02c8168..98283027c62a 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -95,13 +95,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/elastic-agent.yml b/x-pack/elastic-agent/elastic-agent.yml index 72ed3abc1a92..f218468b44f9 100644 --- a/x-pack/elastic-agent/elastic-agent.yml +++ b/x-pack/elastic-agent/elastic-agent.yml @@ -103,13 +103,18 @@ datasources: # install_path: "${path.data}/install" # process: -# # minimal port number for spawned processes -# min_port: 10000 -# # maximum port number for spawned processes -# max_port: 30000 # # timeout for creating new processes. when process is not successfully created by this timeout # # start operation is considered a failure # spawn_timeout: 30s +# # timeout for stopping processes. when process is not stopped by this timeout then the process. +# # is force killed +# stop_timeout: 30s + +# grpc: +# # listen address for the GRPC server that spawned processes connect back to. +# address: localhost +# # port for the GRPC server that spawned processes connect back to. +# port: 6789 # retry: # # Enabled determines whether retry is possible. Default is false. diff --git a/x-pack/elastic-agent/magefile.go b/x-pack/elastic-agent/magefile.go index 2b1610a99af9..719902e74a78 100644 --- a/x-pack/elastic-agent/magefile.go +++ b/x-pack/elastic-agent/magefile.go @@ -12,6 +12,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strings" "time" @@ -165,8 +166,12 @@ func (Build) Clean() { func (Build) TestBinaries() error { p := filepath.Join("pkg", "agent", "operation", "tests", "scripts") + binaryName := "configurable" + if runtime.GOOS == "windows" { + binaryName += ".exe" + } return combineErr( - RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86", "configurable"), filepath.Join(p, "configurable-1.0-darwin-x86", "main.go")), + RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")), ) } diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index 4ad8f27c7f0c..82bddf4d8dcc 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -13,7 +13,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" @@ -39,6 +41,7 @@ type Local struct { log *logger.Logger source source agentInfo *info.AgentInfo + srv *server.Server } type source interface { @@ -78,6 +81,10 @@ func newLocal( } localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx) + localApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{}) + if err != nil { + return nil, errors.New(err, "initialize GRPC listener") + } reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR) @@ -86,7 +93,7 @@ func newLocal( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor)) + router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, localApplication.srv, reporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } @@ -113,6 +120,9 @@ func (l *Local) Start() error { l.log.Info("Agent is starting") defer l.log.Info("Agent is stopped") + if err := l.srv.Start(); err != nil { + return err + } if err := l.source.Start(); err != nil { return err } @@ -123,6 +133,7 @@ func (l *Local) Start() error { // Stop stops a local agent. func (l *Local) Stop() error { l.cancelCtxFn() + l.srv.Stop() return l.source.Stop() } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 269287446db1..3991045518f6 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -11,6 +11,8 @@ import ( "net/http" "net/url" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -18,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet" @@ -44,6 +47,7 @@ type Managed struct { api apiClient agentInfo *info.AgentInfo gateway *fleetGateway + srv *server.Server } func newManaged( @@ -110,6 +114,10 @@ func newManaged( } managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx) + managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{}) + if err != nil { + return nil, errors.New(err, "initialize GRPC listener") + } logR := logreporter.NewReporter(log, cfg.Reporting.Log) fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Reporting.Fleet) @@ -123,7 +131,7 @@ func newManaged( return nil, errors.New(err, "failed to initialize monitoring") } - router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor)) + router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, managedApplication.srv, combinedReporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } @@ -200,6 +208,9 @@ func newManaged( // Start starts a managed elastic-agent. func (m *Managed) Start() error { m.log.Info("Agent is starting") + if err := m.srv.Start(); err != nil { + return err + } m.gateway.Start() return nil } @@ -208,6 +219,7 @@ func (m *Managed) Start() error { func (m *Managed) Stop() error { defer m.log.Info("Agent is stopped") m.cancelCtxFn() + m.srv.Stop() return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index e53529c910b1..74a58b9965bc 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) // EventProcessor is an processor of application event @@ -56,10 +57,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error { return b.configHandler.HandleConfig(cfg) } -func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { +func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags - operator, err := newOperator(ctx, log, id, cfg, r, m) + operator, err := newOperator(ctx, log, id, cfg, srv, r, m) if err != nil { return nil, err } @@ -71,7 +72,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep } } -func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) { +func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) (*operation.Operator, error) { operatorConfig := operatorCfg.DefaultConfig() if err := config.Unpack(&operatorConfig); err != nil { return nil, err @@ -102,6 +103,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config verifier, installer, stateResolver, + srv, r, m, ) diff --git a/x-pack/elastic-agent/pkg/agent/operation/common_test.go b/x-pack/elastic-agent/pkg/agent/operation/common_test.go index 4576674f7350..ca58a54fac0a 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/common_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/common_test.go @@ -6,9 +6,14 @@ package operation import ( "context" + "os" + "path/filepath" + "runtime" "testing" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" @@ -22,9 +27,10 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" ) -var installPath = "tests/scripts" +var downloadPath = getAbsPath("tests/downloads") +var installPath = getAbsPath("tests/scripts") -func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.Config) { +func getTestOperator(t *testing.T, downloadPath string, installPath string, p *app.Descriptor) (*Operator, *operatorCfg.Config) { operatorConfig := &operatorCfg.Config{ RetryConfig: &retry.Config{ Enabled: true, @@ -34,7 +40,8 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. }, ProcessConfig: &process.Config{}, DownloadConfig: &artifact.Config{ - InstallPath: installPath, + TargetDirectory: downloadPath, + InstallPath: installPath, }, } @@ -53,8 +60,16 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. if err != nil { t.Fatal(err) } + srv, err := server.New(l, ":0", &app.ApplicationStatusHandler{}) + if err != nil { + t.Fatal(err) + } + err = srv.Start() + if err != nil { + t.Fatal(err) + } - operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, verifier, installer, stateResolver, nil, noop.NewMonitor()) + operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, verifier, installer, stateResolver, srv, nil, noop.NewMonitor()) if err != nil { t.Fatal(err) } @@ -62,11 +77,24 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. operator.config.DownloadConfig.OperatingSystem = "darwin" operator.config.DownloadConfig.Architecture = "32" + // make the download path so the `operation_verify` can ensure the path exists + downloadConfig := operator.config.DownloadConfig + fullPath, err := artifact.GetArtifactPath(p.BinaryName(), p.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory) + if err != nil { + t.Fatal(err) + } + createFile(t, fullPath) + return operator, operatorConfig } func getLogger() *logger.Logger { - l, _ := logger.New() + cfg, _ := config.NewConfigFrom(map[string]interface{}{ + "logging": map[string]interface{}{ + "level": "error", + }, + }) + l, _ := logger.NewFromConfig(cfg) return l } @@ -74,12 +102,39 @@ func getProgram(binary, version string) *app.Descriptor { downloadCfg := &artifact.Config{ InstallPath: installPath, OperatingSystem: "darwin", + Architecture: "32", } return app.NewDescriptor(binary, version, downloadCfg, nil) } -type TestConfig struct { - TestFile string +func getAbsPath(path string) string { + _, filename, _, _ := runtime.Caller(0) + return filepath.Join(filepath.Dir(filename), path) +} + +func createFile(t *testing.T, path string) { + _, err := os.Stat(path) + if os.IsNotExist(err) { + file, err := os.Create(path) + if err != nil { + t.Fatal(err) + } + defer file.Close() + } +} + +func waitFor(t *testing.T, check func() error) { + started := time.Now() + for { + err := check() + if err == nil { + return + } + if time.Now().Sub(started) >= 15*time.Second { + t.Fatalf("check timed out after 15 second: %s", err) + } + time.Sleep(10 * time.Millisecond) + } } type DummyDownloader struct{} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 371ddbda308d..12f9ca37d4e0 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -9,6 +9,8 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver" @@ -123,9 +125,13 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M if err != nil { t.Fatal(err) } - ctx := context.Background() + srv, err := server.New(l, ":0", &app.ApplicationStatusHandler{}) + if err != nil { + t.Fatal(err) + } - operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, verifier, installer, stateResolver, nil, m) + ctx := context.Background() + operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, verifier, installer, stateResolver, srv, nil, m) if err != nil { t.Fatal(err) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation.go b/x-pack/elastic-agent/pkg/agent/operation/operation.go index fdaa7da31d34..cfc11cceae70 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation.go @@ -23,7 +23,7 @@ type operation interface { // examples: // - Start does not need to run if process is running // - Fetch does not need to run if package is already present - Check() (bool, error) + Check(application Application) (bool, error) // Run runs the operation Run(ctx context.Context, application Application) error } @@ -34,8 +34,8 @@ type Application interface { Start(ctx context.Context, p app.Taggable, cfg map[string]interface{}) error Stop() Configure(ctx context.Context, config map[string]interface{}) error - State() state.State Monitor() monitoring.Monitor + State() state.State } // Descriptor defines a program which needs to be run. diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index b50aef4de7a6..8fe6af3056e6 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -46,11 +46,10 @@ func (o *operationConfig) Name() string { return "operation-config" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationConfig) Check() (bool, error) { return true, nil } +// Check checks whether config needs to be run. +// +// Always returns true. +func (o *operationConfig) Check(_ Application) (bool, error) { return true, nil } // Run runs the operation func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go index 3c25287e1c71..bda22a861e00 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go @@ -46,11 +46,10 @@ func (o *operationFetch) Name() string { return "operation-fetch" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationFetch) Check() (bool, error) { +// Check checks whether fetch needs to occur. +// +// If the artifacts already exists then fetch will not be ran. +func (o *operationFetch) Check(_ Application) (bool, error) { downloadConfig := o.operatorConfig.DownloadConfig fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go index 34faf9a7fc27..0e045cb15dfb 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go @@ -45,11 +45,10 @@ func (o *operationInstall) Name() string { return "operation-install" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationInstall) Check() (bool, error) { +// Check checks whether install needs to be ran. +// +// If the installation directory already exists then it will not be ran. +func (o *operationInstall) Check(_ Application) (bool, error) { installDir := o.program.Directory() _, err := os.Stat(installDir) return os.IsNotExist(err), nil diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go index fb3370ba156a..587d546bb8fd 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go @@ -24,11 +24,10 @@ func (o *operationRemove) Name() string { return "operation-remove" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationRemove) Check() (bool, error) { +// Check checks whether remove needs to run. +// +// Always returns false. +func (o *operationRemove) Check(_ Application) (bool, error) { return false, nil } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go index 803276f68016..d6dc4f3d3892 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go @@ -7,6 +7,8 @@ package operation import ( "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -47,14 +49,16 @@ func (o *operationStart) Name() string { return "operation-start" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationStart) Check() (bool, error) { - // TODO: get running processes and compare hashes - - return true, nil +// Check checks whether application needs to be started. +// +// Only starts the application when in stopped state, any other state +// and the application is handled by the life cycle inside of the `Application` +// implementation. +func (o *operationStart) Check(application Application) (bool, error) { + if application.State().Status == state.Stopped { + return true, nil + } + return false, nil } // Run runs the operation diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go b/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go index b6206fd0d54c..222e933b877d 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go @@ -7,6 +7,8 @@ package operation import ( "context" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" @@ -36,12 +38,14 @@ func (o *operationStop) Name() string { return "operation-stop" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationStop) Check() (bool, error) { - return true, nil +// Check checks whether application needs to be stopped. +// +// If the application state is not stopped then stop should be performed. +func (o *operationStop) Check(application Application) (bool, error) { + if application.State().Status != state.Stopped { + return true, nil + } + return false, nil } // Run runs the operation diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go index b3ffbee3dcda..63f987b748a8 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go @@ -42,11 +42,10 @@ func (o *operationVerify) Name() string { return "operation-verify" } -// Check checks whether operation needs to be run -// examples: -// - Start does not need to run if process is running -// - Fetch does not need to run if package is already present -func (o *operationVerify) Check() (bool, error) { +// Check checks whether verify needs to occur. +// +// Only if the artifacts exists does it need to be verified. +func (o *operationVerify) Check(_ Application) (bool, error) { downloadConfig := o.operatorConfig.DownloadConfig fullPath, err := artifact.GetArtifactPath(o.program.BinaryName(), o.program.Version(), downloadConfig.OS(), downloadConfig.Arch(), downloadConfig.TargetDirectory) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index 10e259bd8beb..eb80f05eb1c9 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -11,6 +11,8 @@ import ( "strings" "sync" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" @@ -22,8 +24,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - rconfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) const ( @@ -43,6 +44,7 @@ type Operator struct { config *operatorCfg.Config handlers map[string]handleFunc stateResolver *stateresolver.StateResolver + srv *server.Server eventProcessor callbackHooks monitor monitoring.Monitor isMonitoring int @@ -67,6 +69,7 @@ func NewOperator( verifier download.Verifier, installer install.Installer, stateResolver *stateresolver.StateResolver, + srv *server.Server, eventProcessor callbackHooks, monitor monitoring.Monitor) (*Operator, error) { @@ -92,6 +95,7 @@ func NewOperator( verifier: verifier, installer: installer, stateResolver: stateResolver, + srv: srv, apps: make(map[string]Application), eventProcessor: eventProcessor, monitor: monitor, @@ -106,7 +110,7 @@ func NewOperator( } // State describes the current state of the system. -// Reports all known beats and theirs states. Whether they are running +// Reports all known applications and theirs states. Whether they are running // or not, and if they are information about process is also present. func (o *Operator) State() map[string]state.State { result := make(map[string]state.State) @@ -200,7 +204,7 @@ func (o *Operator) runFlow(p Descriptor, operations []operation) error { return err } - shouldRun, err := op.Check() + shouldRun, err := op.Check(app) if err != nil { return err } @@ -216,6 +220,11 @@ func (o *Operator) runFlow(p Descriptor, operations []operation) error { } } + // when application is stopped remove from the operator + if app.State().Status == state.Stopped { + o.deleteApp(p) + } + return nil } @@ -230,8 +239,6 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { return a, nil } - factory := rconfig.NewConnFactory(o.config.RetryConfig.Delay, o.config.RetryConfig.MaxDelay) - specifier, ok := p.(app.Specifier) if !ok { return nil, fmt.Errorf("descriptor is not an app.Specifier") @@ -245,7 +252,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { o.pipelineID, o.config.LoggingConfig.Level.String(), specifier, - factory, + o.srv, o.config, o.logger, o.eventProcessor.OnFailing, @@ -259,6 +266,16 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { return a, nil } +func (o *Operator) deleteApp(p Descriptor) { + o.appsLock.Lock() + defer o.appsLock.Unlock() + + id := p.ID() + + o.logger.Debugf("operator is removing %s from app collection: %v", p.ID(), o.apps) + delete(o.apps, id) +} + func isMonitorable(descriptor Descriptor) bool { isSidecar := app.IsSidecar(descriptor) return !isSidecar // everything is monitorable except sidecar diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go index 76ffdb432d5d..2db96b30f1a5 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go @@ -2,8 +2,6 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// +build linux darwin - package operation import ( @@ -11,8 +9,10 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "testing" - "time" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" @@ -20,87 +20,78 @@ import ( func TestMain(m *testing.M) { // init supported with test cases - shortSpec := program.Spec{ - Name: "short", - Cmd: "/bin/echo", - Args: []string{"123"}, - } - longSpec := program.Spec{ - Name: "long", - Cmd: "/bin/sh", - Args: []string{"-c", "echo 123; sleep 100"}, - } configurableSpec := program.Spec{ Name: "configurable", Cmd: "configurable", Args: []string{}, } - program.Supported = append(program.Supported, shortSpec, longSpec, configurableSpec) + program.Supported = append(program.Supported, configurableSpec) + + p := getProgram("configurable", "1.0") + spec := p.Spec() + path := spec.BinaryPath + if runtime.GOOS == "windows" { + path += ".exe" + } + if s, err := os.Stat(path); err != nil || s == nil { + panic(fmt.Errorf("binary not available %s", spec.BinaryPath)) + } + + os.Exit(m.Run()) } func TestNotSupported(t *testing.T) { p := getProgram("notsupported", "1.0") - operator, _ := getTestOperator(t, "tests/scripts") + operator, _ := getTestOperator(t, downloadPath, installPath, p) err := operator.start(p, nil) if err == nil { t.Fatal("was expecting error but got none") } } -func TestShortRun(t *testing.T) { - p := getProgram("short", "1.0") +func TestConfigurableRun(t *testing.T) { + p := getProgram("configurable", "1.0") - operator, _ := getTestOperator(t, "tests/scripts") + operator, _ := getTestOperator(t, downloadPath, installPath, p) if err := operator.start(p, nil); err != nil { t.Fatal(err) } + defer operator.stop(p) // failure catch, to ensure no sub-process stays running - // let the watcher kick in - <-time.After(1 * time.Second) - - items := operator.State() - if len(items) == 1 && items[p.ID()].Status == state.Running { - t.Fatalf("Process reattach info not stopped %#v, %+v", items, items[p.ID()].Status) - } + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to running") + } + return nil + }) - os.Remove(filepath.Join(operator.config.DownloadConfig.InstallPath, "short--1.0.yml")) -} - -func TestShortRunInvalid(t *testing.T) { - p := getProgram("bumblebee", "") - operator, _ := getTestOperator(t, "/bin") - if err := operator.start(p, nil); err == nil { - t.Fatal(err) - } - - // let the watcher kick in - <-time.After(1 * time.Second) - - items := operator.State() - if len(items) == 1 && items[p.ID()].Status == state.Running { - t.Fatalf("Process reattach info not stopped %#v, %+v", items, items[p.ID()].Status) - } -} - -func TestLongRunWithStop(t *testing.T) { - p := getProgram("long", "1.0") - - operator, _ := getTestOperator(t, "tests/scripts") - if err := operator.start(p, nil); err != nil { - t.Fatal(err) + // try to configure + cfg := make(map[string]interface{}) + tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32())) + cfg["TestFile"] = tstFilePath + if err := operator.pushConfig(p, cfg); err != nil { + t.Fatalf("failed to config: %v", err) } - // wait for watcher so we know it was now cancelled immediately - <-time.After(1 * time.Second) + waitFor(t, func() error { + if s, err := os.Stat(tstFilePath); err != nil || s == nil { + return fmt.Errorf("failed to create a file using Config call %s", tstFilePath) + } + return nil + }) items := operator.State() item0, ok := items[p.ID()] if !ok || item0.Status != state.Running { - t.Fatalf("Process not running %#v", items) + t.Fatalf("Process no longer running after config %#v", items) } - pid := item0.ProcessInfo.PID // stop the process @@ -108,15 +99,14 @@ func TestLongRunWithStop(t *testing.T) { t.Fatalf("Failed to stop process with PID %d: %v", pid, err) } - // let the watcher kick in - <-time.After(1 * time.Second) - - // check state updated - items = operator.State() - item1, ok := items[p.ID()] - if !ok || item1.Status == state.Running { - t.Fatalf("Process state says running after Stop %#v", items) - } + waitFor(t, func() error { + items := operator.State() + _, ok := items[p.ID()] + if ok { + return fmt.Errorf("state for process, should be removed") + } + return nil + }) // check process stopped proc, err := os.FindProcess(pid) @@ -125,163 +115,243 @@ func TestLongRunWithStop(t *testing.T) { } } -func TestLongRunWithCrash(t *testing.T) { - p := getProgram("long", "1.0") +func TestConfigurableFailed(t *testing.T) { + p := getProgram("configurable", "1.0") - operator, _ := getTestOperator(t, "tests/scripts") + operator, _ := getTestOperator(t, downloadPath, installPath, p) if err := operator.start(p, nil); err != nil { t.Fatal(err) } - - // wait for watcher so we know it was now cancelled immediately - <-time.After(1 * time.Second) - - items := operator.State() - item0, ok := items[p.ID()] - if !ok || item0.Status != state.Running { - t.Fatalf("Process not running %#v", items) - } - - // crash the process - pid := item0.ProcessInfo.PID - proc, err := os.FindProcess(pid) - if err != nil { - t.Fatalf("Failed to get process with PID %d: %v", pid, err) - } - if err := proc.Kill(); err != nil { - t.Fatalf("Failed to kill process with PID %d: %v", pid, err) + defer operator.stop(p) // failure catch, to ensure no sub-process stays running + + var pid int + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to running") + } + pid = item.ProcessInfo.PID + return nil + }) + + // try to configure (with failed status) + cfg := make(map[string]interface{}) + tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32())) + cfg["TestFile"] = tstFilePath + cfg["Status"] = proto.StateObserved_FAILED + if err := operator.pushConfig(p, cfg); err != nil { + t.Fatalf("failed to config: %v", err) } - // let the watcher kick in - <-time.After(3 * time.Second) - - // check process restarted - items = operator.State() - item1, ok := items[p.ID()] - if !ok || item1.Status != state.Running { - t.Fatalf("Process not present after restart %#v", items) + // should still create the file + waitFor(t, func() error { + if s, err := os.Stat(tstFilePath); err != nil || s == nil { + return fmt.Errorf("failed to create a file using Config call %s", tstFilePath) + } + return nil + }) + + // wait for not running status + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status == state.Running { + return fmt.Errorf("process never left running") + } + return nil + }) + + // don't send status anymore + delete(cfg, "Status") + if err := operator.pushConfig(p, cfg); err != nil { + t.Fatalf("failed to config: %v", err) } - newPid := item1.ProcessInfo.PID - if pid == newPid { - t.Fatalf("Process not restarted, still with the same PID %d", pid) - } + // check that it restarted (has a new PID) + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.ProcessInfo == nil { + return fmt.Errorf("in restart loop") + } + if pid == item.ProcessInfo.PID { + return fmt.Errorf("process never restarted") + } + pid = item.ProcessInfo.PID + return nil + }) + + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to back to running") + } + return nil + }) - // stop restarted process + // stop the process if err := operator.stop(p); err != nil { - t.Fatalf("Failed to stop restarted process %d: %v", newPid, err) - } -} - -func TestTwoProcesses(t *testing.T) { - p := getProgram("long", "1.0") - - operator, _ := getTestOperator(t, "tests/scripts") - if err := operator.start(p, nil); err != nil { - t.Fatal(err) - } - - // wait for watcher so we know it was now cancelled immediately - <-time.After(1 * time.Second) - - items := operator.State() - item0, ok := items[p.ID()] - if !ok || item0.Status != state.Running { - t.Fatalf("Process not running %#v", items) - } - - // start the same process again - if err := operator.start(p, nil); err != nil { - t.Fatal(err) - } - - // let the watcher kick in - <-time.After(1 * time.Second) - - items = operator.State() - item1, ok := items[p.ID()] - if !ok || item0.Status != state.Running { - t.Fatalf("Process not running %#v", items) + t.Fatalf("Failed to stop process with PID %d: %v", pid, err) } - if item0.ProcessInfo.PID != item1.ProcessInfo.PID { - t.Fatal("Process got updated, expected the same") + // check process stopped + proc, err := os.FindProcess(pid) + if err != nil && proc != nil { + t.Fatal("Process found") } - - // check process restarted - operator.stop(p) } -func TestConfigurableRun(t *testing.T) { +func TestConfigurableCrash(t *testing.T) { p := getProgram("configurable", "1.0") - spec := p.Spec() - if s, err := os.Stat(spec.BinaryPath); err != nil || s == nil { - t.Fatalf("binary not available %s", spec.BinaryPath) - } else { - t.Logf("found file %v", spec.BinaryPath) - } - - operator, _ := getTestOperator(t, installPath) + operator, _ := getTestOperator(t, downloadPath, installPath, p) if err := operator.start(p, nil); err != nil { t.Fatal(err) } - - // wait for watcher so we know it was now cancelled immediately - <-time.After(1 * time.Second) - - items := operator.State() - item0, ok := items[p.ID()] - if !ok || item0.Status != state.Running { - t.Fatalf("Process not running %#v", items) - } - - pid := item0.ProcessInfo.PID - - // check it is still running - <-time.After(2 * time.Second) - - items = operator.State() - item1, ok := items[p.ID()] - if !ok || item1.Status != state.Running { - t.Fatalf("Process stopped running %#v", items) - } - - newPID := item1.ProcessInfo.PID - if pid != newPID { - t.Fatalf("Process crashed in between first pid: '%v' second pid: '%v'", pid, newPID) - } - - // try to configure + defer operator.stop(p) // failure catch, to ensure no sub-process stays running + + var pid int + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to running") + } + pid = item.ProcessInfo.PID + return nil + }) + + // try to configure (with failed status) cfg := make(map[string]interface{}) tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32())) cfg["TestFile"] = tstFilePath + cfg["Crash"] = true if err := operator.pushConfig(p, cfg); err != nil { t.Fatalf("failed to config: %v", err) } - if s, err := os.Stat(tstFilePath); err != nil || s == nil { - t.Fatalf("failed to create a file using Config call %s", tstFilePath) + // should still create the file + waitFor(t, func() error { + if s, err := os.Stat(tstFilePath); err != nil || s == nil { + return fmt.Errorf("failed to create a file using Config call %s", tstFilePath) + } + return nil + }) + + // wait for not running status + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status == state.Running { + return fmt.Errorf("process never left running") + } + return nil + }) + + // don't send crash anymore + delete(cfg, "Crash") + if err := operator.pushConfig(p, cfg); err != nil { + t.Fatalf("failed to config: %v", err) } + // check that it restarted (has a new PID) + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.ProcessInfo == nil { + return fmt.Errorf("in restart loop") + } + if pid == item.ProcessInfo.PID { + return fmt.Errorf("process never restarted") + } + pid = item.ProcessInfo.PID + return nil + }) + + // let the process get back to ready + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to back to running") + } + return nil + }) + // stop the process if err := operator.stop(p); err != nil { t.Fatalf("Failed to stop process with PID %d: %v", pid, err) } - // let the watcher kick in - <-time.After(1 * time.Second) - - // check reattach collection cleaned up - items = operator.State() - item2, ok := items[p.ID()] - if !ok || item2.Status == state.Running { - t.Fatalf("Process still running after stop %#v", items) - } - // check process stopped proc, err := os.FindProcess(pid) if err != nil && proc != nil { t.Fatal("Process found") } } + +func TestConfigurableStartStop(t *testing.T) { + p := getProgram("configurable", "1.0") + + operator, _ := getTestOperator(t, downloadPath, installPath, p) + defer operator.stop(p) // failure catch, to ensure no sub-process stays running + + // start and stop it 3 times + for i := 0; i < 3; i++ { + if err := operator.start(p, nil); err != nil { + t.Fatal(err) + } + + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to running") + } + return nil + }) + + // stop the process + if err := operator.stop(p); err != nil { + t.Fatalf("Failed to stop process: %v", err) + } + + waitFor(t, func() error { + items := operator.State() + _, ok := items[p.ID()] + if ok { + return fmt.Errorf("state for process, should be removed") + } + return nil + }) + } +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86/main.go b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86/main.go deleted file mode 100644 index 0ca3be9192f0..000000000000 --- a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86/main.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package main - -import ( - "context" - "os" - "path/filepath" - - "gopkg.in/yaml.v2" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/server" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" -) - -func main() { - f, _ := os.OpenFile(filepath.Join(os.TempDir(), "testing.out"), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) - f.WriteString("starting \n") - s := &configServer{} - if err := server.NewGrpcServer(os.Stdin, s); err != nil { - f.WriteString(err.Error()) - panic(err) - } - f.WriteString("finished \n") -} - -type configServer struct { -} - -// TestConfig is a configuration for testing Config calls -type TestConfig struct { - TestFile string `config:"TestFile" yaml:"TestFile"` -} - -func (*configServer) Config(ctx context.Context, req *grpc.ConfigRequest) (*grpc.ConfigResponse, error) { - cfgString := req.GetConfig() - - testCfg := &TestConfig{} - if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil { - return &grpc.ConfigResponse{}, err - } - - if testCfg.TestFile != "" { - tf, err := os.Create(testCfg.TestFile) - if err != nil { - return &grpc.ConfigResponse{}, err - } - - err = tf.Close() - if err != nil { - return &grpc.ConfigResponse{}, err - } - } - - return &grpc.ConfigResponse{}, nil -} - -// Status return ok. -func (*configServer) Status(ctx context.Context, req *grpc.StatusRequest) (*grpc.StatusResponse, error) { - return &grpc.StatusResponse{Status: "ok"}, nil -} diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86/README.md b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md similarity index 100% rename from x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86/README.md rename to x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go new file mode 100644 index 000000000000..f5de06883cdc --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go @@ -0,0 +1,98 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func main() { + f, _ := os.OpenFile(filepath.Join(os.TempDir(), "testing.out"), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) + f.WriteString("starting \n") + ctx, cancel := context.WithCancel(context.Background()) + s := &configServer{ + f: f, + ctx: ctx, + cancel: cancel, + } + client, err := client.NewFromReader(os.Stdin, s) + if err != nil { + f.WriteString(err.Error()) + panic(err) + } + s.client = client + err = client.Start(ctx) + if err != nil { + f.WriteString(err.Error()) + panic(err) + } + <-ctx.Done() + f.WriteString("finished \n") +} + +type configServer struct { + f *os.File + ctx context.Context + cancel context.CancelFunc + client *client.Client +} + +func (s *configServer) OnConfig(cfgString string) { + s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file") + + testCfg := &TestConfig{} + if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err)) + return + } + + if testCfg.TestFile != "" { + tf, err := os.Create(testCfg.TestFile) + if err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err)) + return + } + + err = tf.Close() + if err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err)) + return + } + } + + if testCfg.Crash { + os.Exit(2) + } + + if testCfg.Status != nil { + s.client.Status(*testCfg.Status, "Custom status") + } else { + s.client.Status(proto.StateObserved_HEALTHY, "Running") + } +} + +func (s *configServer) OnStop() { + s.client.Status(proto.StateObserved_STOPPING, "Stopping") + s.cancel() +} + +func (s *configServer) OnError(err error) { + s.f.WriteString(err.Error()) +} + +// TestConfig is a configuration for testing Config calls +type TestConfig struct { + TestFile string `config:"TestFile" yaml:"TestFile"` + Status *proto.StateObserved_Status `config:"Status" yaml:"Status"` + Crash bool `config:"Crash" yaml:"Crash"` +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/long-1.0-darwin-x86/README.md b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/long-1.0-darwin-x86/README.md deleted file mode 100644 index 8429c59805c3..000000000000 --- a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/long-1.0-darwin-x86/README.md +++ /dev/null @@ -1 +0,0 @@ -Testing program emulating tool which executes and completes with a delay or is long running. diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/short-1.0-darwin-x86/README.md b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/short-1.0-darwin-x86/README.md deleted file mode 100644 index 0576a8b3c4eb..000000000000 --- a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/short-1.0-darwin-x86/README.md +++ /dev/null @@ -1 +0,0 @@ -Testing program emulating tool which executes and completes immediately diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/app.go b/x-pack/elastic-agent/pkg/core/plugin/app/app.go index 2cf2adbdba2b..d85f28b49abc 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/app.go @@ -8,10 +8,13 @@ import ( "context" "fmt" "os" - "path/filepath" "sync" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" @@ -19,8 +22,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/tokenbucket" ) @@ -44,11 +45,13 @@ type Application struct { pipelineID string logLevel string spec Specifier - state state.State - grpcClient remoteconfig.Client - clientFactory remoteconfig.ConnectionCreator + srv *server.Server + srvState *server.ApplicationState limiter *tokenbucket.Bucket failureReporter ReportFailureFunc + startContext context.Context + tag Taggable + state state.State uid int gid int @@ -73,7 +76,7 @@ func NewApplication( ctx context.Context, id, appName, pipelineID, logLevel string, spec Specifier, - factory remoteconfig.ConnectionCreator, + srv *server.Server, cfg *config.Config, logger *logger.Logger, failureReporter ReportFailureFunc, @@ -93,7 +96,7 @@ func NewApplication( pipelineID: pipelineID, logLevel: logLevel, spec: spec, - clientFactory: factory, + srv: srv, processConfig: cfg.ProcessConfig, downloadConfig: cfg.DownloadConfig, retryConfig: cfg.RetryConfig, @@ -111,6 +114,13 @@ func (a *Application) Monitor() monitoring.Monitor { return a.monitor } +// State returns the application state. +func (a *Application) State() state.State { + a.appLock.Lock() + defer a.appLock.Unlock() + return a.state +} + // Name returns application name func (a *Application) Name() string { return a.name @@ -121,44 +131,38 @@ func (a *Application) Stop() { a.appLock.Lock() defer a.appLock.Unlock() - if a.state.Status == state.Running && a.state.ProcessInfo != nil { - if closeClient, ok := a.grpcClient.(closer); ok { - closeClient.Close() - } - - process.Stop(a.logger, a.state.ProcessInfo.PID) + if a.state.Status == state.Stopped { + return + } - a.state.Status = state.Stopped - a.state.ProcessInfo = nil - a.grpcClient = nil - - // remove generated configuration if present - filename := fmt.Sprintf(configFileTempl, a.id) - filePath, err := filepath.Abs(filepath.Join(a.downloadConfig.InstallPath, filename)) - if err == nil { - // ignoring error: not critical - os.Remove(filePath) + stopSig := os.Interrupt + if a.srvState != nil { + if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil { + // kill the process if stop through GRPC doesn't work + stopSig = os.Kill } + a.srvState = nil + } + if a.state.ProcessInfo != nil { + if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil { + // no error on signal, so wait for it to stop + _, _ = a.state.ProcessInfo.Process.Wait() + } + a.state.ProcessInfo = nil // cleanup drops a.monitor.Cleanup(a.name, a.pipelineID) } + a.state.Status = state.Stopped + a.state.Message = "Stopped" } -// State returns the state of the application [Running, Stopped]. -func (a *Application) State() state.State { - a.appLock.Lock() - defer a.appLock.Unlock() - - return a.state -} - -func (a *Application) watch(ctx context.Context, p Taggable, proc *os.Process, cfg map[string]interface{}) { +func (a *Application) watch(ctx context.Context, p Taggable, proc *process.Info, cfg map[string]interface{}) { go func() { var procState *os.ProcessState select { - case ps := <-a.waitProc(proc): + case ps := <-a.waitProc(proc.Process): procState = ps case <-a.bgContext.Done(): a.Stop() @@ -166,21 +170,23 @@ func (a *Application) watch(ctx context.Context, p Taggable, proc *os.Process, c } a.appLock.Lock() - s := a.state.Status - a.state.Status = state.Stopped a.state.ProcessInfo = nil - a.appLock.Unlock() + srvState := a.srvState - if procState.Success() { + if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING { + a.appLock.Unlock() return } - if s == state.Running { - // it was a crash, report it async not to block - // process management with networking issues - go a.reportCrash(ctx) - a.Start(ctx, p, cfg) - } + msg := fmt.Sprintf("Exited with code: %d", procState.ExitCode()) + a.state.Status = state.Crashed + a.state.Message = msg + a.appLock.Unlock() + + // it was a crash, report it async not to block + // process management with networking issues + go a.reportCrash(ctx) + a.Start(ctx, p, cfg) }() } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/client.go b/x-pack/elastic-agent/pkg/core/plugin/app/client.go deleted file mode 100644 index 59d27da6e136..000000000000 --- a/x-pack/elastic-agent/pkg/core/plugin/app/client.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package app - -// closer close the connection -type closer interface { - Close() error -} diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/configure.go b/x-pack/elastic-agent/pkg/core/plugin/app/configure.go index 55ee1b8d48f8..412577afedd3 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/configure.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/configure.go @@ -6,29 +6,16 @@ package app import ( "context" - "net" - "time" "gopkg.in/yaml.v2" - "github.com/elastic/beats/v7/libbeat/common/backoff" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" -) -const ( - // DefaultTimeout is the default timeout for network calls - DefaultTimeout = 60 * time.Second + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" ) -type backoffClient interface { - Backoff() backoff.Backoff -} - // Configure configures the application with the passed configuration. -func (a *Application) Configure(ctx context.Context, config map[string]interface{}) (err error) { +func (a *Application) Configure(_ context.Context, config map[string]interface{}) (err error) { defer func() { if err != nil { // inject App metadata @@ -36,44 +23,23 @@ func (a *Application) Configure(ctx context.Context, config map[string]interface } }() + a.appLock.Lock() + defer a.appLock.Unlock() + if a.state.Status == state.Stopped { return errors.New(ErrAppNotRunning) } - - retryFn := func(ctx context.Context) error { - a.appLock.Lock() - defer a.appLock.Unlock() - - // TODO: check versions(logical clock) in between retries in case newer version sneaked in - - ctx, cancelFn := context.WithTimeout(ctx, DefaultTimeout) - defer cancelFn() - - if a.grpcClient == nil { - return errors.New(ErrClientNotFound) - } - - rawYaml, err := yaml.Marshal(config) - if err != nil { - return errors.New(err, errors.TypeApplication) - } - - configClient, ok := a.grpcClient.(remoteconfig.ConfiguratorClient) - if !ok { - return errors.New(ErrClientNotConfigurable, errors.TypeApplication) - } - - a.logger.Debugf("configuring application %s: %s", a.Name(), string(rawYaml)) - err = configClient.Config(ctx, string(rawYaml)) - - if netErr, ok := err.(net.Error); ok && (netErr.Timeout() || netErr.Temporary()) { - // not fatal, we will retry - return errors.New(netErr, errors.TypeApplication) - } - - // if not transient mark as fatal - return retry.ErrorMakeFatal(err) + if a.srvState == nil { + return errors.New(ErrAppNotRunning) } - return retry.Do(ctx, a.retryConfig, retryFn) + cfgStr, err := yaml.Marshal(config) + if err != nil { + return errors.New(err, errors.TypeApplication) + } + err = a.srvState.UpdateConfig(string(cfgStr)) + if err != nil { + return errors.New(err, errors.TypeApplication) + } + return nil } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/start.go b/x-pack/elastic-agent/pkg/core/plugin/app/start.go index 2994d2e205c3..aafaff2c4cc1 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/start.go @@ -6,35 +6,22 @@ package app import ( "context" - "fmt" - "os" + "io" "path/filepath" - "runtime" - "strings" - "time" - "unicode" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/authority" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" -) - -const ( - configurationFlag = "-c" - configFileTempl = "%s.yml" // providing beat id - configFilePermissions = 0644 // writable only by owner ) -// ConfiguratorClient is the client connecting elastic-agent and a process -type stateClient interface { - Status(ctx context.Context) (string, error) - Close() error -} - // Start starts the application with a specified config. func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]interface{}) (err error) { defer func() { @@ -43,17 +30,57 @@ func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]inte err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id)) } }() + + cfgStr, err := yaml.Marshal(cfg) + if err != nil { + return err + } + + // because `Start` can be called by `ApplicationStatusHandler` to perform a restart on failure + // the locking needs to be handled in the correct order. + a.appLock.Lock() + a.startContext = ctx + a.tag = t + srvState := a.srvState + a.appLock.Unlock() + + // Failed applications can be started again. + if srvState != nil { + srvState.SetStatus(proto.StateObserved_STARTING, "Starting") + srvState.UpdateConfig(string(cfgStr)) + } else { + a.appLock.Lock() + a.srvState, err = a.srv.Register(a, string(cfgStr)) + if err != nil { + return err + } + a.appLock.Unlock() + } + + // now that `SetStatus` would call `ApplicationStatusHandler` has occurred the + // reset of `Start` can be held by the lock. a.appLock.Lock() defer a.appLock.Unlock() - if a.state.Status == state.Running { - return nil + if a.state.Status != state.Stopped { + // restarting as it was previously in a different state + a.state.Status = state.Restarting + a.state.Message = "Restarting" + } else { + a.state.Status = state.Starting + a.state.Message = "Starting" } defer func() { if err != nil { - // reportError() - a.state.Status = state.Stopped + if a.srvState != nil { + a.srvState.Destroy() + a.srvState = nil + } + if a.state.ProcessInfo != nil { + _ = a.state.ProcessInfo.Process.Kill() + a.state.ProcessInfo = nil + } } }() @@ -61,16 +88,6 @@ func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]inte return err } - // TODO: provider -> client - ca, err := generateCA() - if err != nil { - return errors.New(err, errors.TypeSecurity) - } - processCreds, err := generateConfigurable(ca) - if err != nil { - return errors.New(err, errors.TypeSecurity) - } - if a.limiter != nil { a.limiter.Add() } @@ -93,26 +110,28 @@ func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]inte a.processConfig, a.uid, a.gid, - processCreds, spec.Args...) if err != nil { return err } - a.waitForGrpc(spec, ca) - - a.grpcClient, err = generateClient(a.state.ProcessInfo.Address, a.clientFactory, ca) - if err != nil { - return errors.New(err, errors.TypeSecurity) - } - a.state.Status = state.Running + // write connect info to stdin + go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin) // setup watcher - a.watch(ctx, t, a.state.ProcessInfo.Process, cfg) + a.watch(ctx, t, a.state.ProcessInfo, cfg) return nil } +func (a *Application) writeToStdin(as *server.ApplicationState, wc io.WriteCloser) { + err := as.WriteConnInfo(wc) + if err != nil { + a.logger.Errorf("failed writing connection info to spawned application: %s", err) + } + _ = wc.Close() +} + func injectLogLevel(logLevel string, args []string) []string { var level string // Translate to level beat understands @@ -134,193 +153,7 @@ func injectLogLevel(logLevel string, args []string) []string { return append(args, "-E", "logging.level="+level) } -func (a *Application) waitForGrpc(spec ProcessSpec, ca *authority.CertificateAuthority) error { - const ( - rounds int = 3 - roundsTimeout time.Duration = 30 * time.Second - retries int = 5 - retryTimeout time.Duration = 2 * time.Second - ) - - checkFn := func(ctx context.Context, address string) error { - return a.checkGrpcHTTP(ctx, address, ca) - } - if isPipe(a.state.ProcessInfo.Address) { - checkFn = a.checkGrpcPipe - } - - for round := 1; round <= rounds; round++ { - for retry := 1; retry <= retries; retry++ { - c, cancelFn := context.WithTimeout(a.bgContext, retryTimeout) - err := checkFn(c, a.state.ProcessInfo.Address) - if err == nil { - cancelFn() - return nil - } - cancelFn() - - // do not wait on last - if retry != retries { - select { - case <-time.After(retryTimeout): - case <-a.bgContext.Done(): - return nil - } - } - } - - // do not wait on last - if round != rounds { - select { - case <-time.After(time.Duration(round) * roundsTimeout): - case <-a.bgContext.Done(): - return nil - } - } - } - - // do not err out, config calls will fail with after some more retries - return nil -} - -func isPipe(address string) bool { - address = strings.TrimPrefix(address, "http+") - return strings.HasPrefix(address, "file:") || - strings.HasPrefix(address, "unix:") || - strings.HasPrefix(address, "npipe") || - strings.HasPrefix(address, `\\.\pipe\`) || - isWindowsPath(address) -} - -func (a *Application) checkGrpcPipe(ctx context.Context, address string) error { - // TODO: not supported yet - return nil -} - -func (a *Application) checkGrpcHTTP(ctx context.Context, address string, ca *authority.CertificateAuthority) error { - grpcClient, err := generateClient(a.state.ProcessInfo.Address, a.clientFactory, ca) - if err != nil { - return errors.New(err, errors.TypeSecurity) - } - - stateClient, ok := grpcClient.(stateClient) - if !ok { - // does not support getting state - // let successive calls fail/succeed - return nil - } - - result, err := stateClient.Status(ctx) - defer stateClient.Close() - if err != nil { - return errors.New(err, "getting state failed", errors.TypeNetwork) - } - - if strings.ToLower(result) != "ok" { - return errors.New( - fmt.Sprintf("getting state failed. not ok state received: '%s'", result), - errors.TypeNetwork) - } - - return nil -} - func injectDataPath(args []string, pipelineID, id string) []string { dataPath := filepath.Join(paths.Data(), "run", pipelineID, id) return append(args, "-E", "path.data="+dataPath) } - -func generateCA() (*authority.CertificateAuthority, error) { - ca, err := authority.NewCA() - if err != nil { - return nil, errors.New(err, "app.Start", errors.TypeSecurity) - } - - return ca, nil -} - -func generateConfigurable(ca *authority.CertificateAuthority) (*process.Creds, error) { - processCreds, err := getProcessCredentials(ca) - if err != nil { - return nil, errors.New(err, errors.TypeSecurity) - } - - return processCreds, nil -} - -func generateClient(address string, factory remoteconfig.ConnectionCreator, ca *authority.CertificateAuthority) (remoteconfig.Client, error) { - connectionProvider, err := getConnectionProvider(ca, address) - if err != nil { - return nil, errors.New(err, errors.TypeNetwork) - } - - grpcClient, err := factory.NewConnection(connectionProvider) - if err != nil { - return nil, errors.New(err, "creating connection", errors.TypeNetwork) - } - - return grpcClient, nil -} - -func getConnectionProvider(ca *authority.CertificateAuthority, address string) (*grpc.ConnectionProvider, error) { - clientPair, err := ca.GeneratePair() - if err != nil { - return nil, errors.New(err, errors.TypeNetwork) - } - - return grpc.NewConnectionProvider(address, ca.Crt(), clientPair.Key, clientPair.Crt), nil -} - -func updateSpecConfig(spec *ProcessSpec, configPath string) error { - // check if config is already provided - configIndex := -1 - for i, v := range spec.Args { - if v == configurationFlag { - configIndex = i - break - } - } - - if configIndex != -1 { - // -c provided - if len(spec.Args) == configIndex+1 { - // -c is last argument, appending - spec.Args = append(spec.Args, configPath) - } - spec.Args[configIndex+1] = configPath - return nil - } - - spec.Args = append(spec.Args, configurationFlag, configPath) - return nil -} - -func getProcessCredentials(ca *authority.CertificateAuthority) (*process.Creds, error) { - // processPK and Cert serves as a server credentials - processPair, err := ca.GeneratePair() - if err != nil { - return nil, errors.New(err, "failed to generate credentials") - } - - return &process.Creds{ - CaCert: ca.Crt(), - PK: processPair.Key, - Cert: processPair.Crt, - }, nil -} - -func isWindowsPath(path string) bool { - if len(path) < 4 { - return false - } - return unicode.IsLetter(rune(path[0])) && path[1] == ':' -} - -func changeOwner(path string, uid, gid int) error { - if runtime.GOOS == "windows" { - // on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError - return nil - } - - return os.Chown(path, uid, gid) -} diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/status.go b/x-pack/elastic-agent/pkg/core/plugin/app/status.go new file mode 100644 index 000000000000..a3039af0c734 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/plugin/app/status.go @@ -0,0 +1,82 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package app + +import ( + "context" + "fmt" + + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + pstate "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" +) + +// ApplicationStatusHandler expects that only Application is registered in the server and updates the +// current state of the application from the OnStatusChange callback from inside the server. +// +// In the case that an application is reported as failed by the server it will then restart the application, unless +// it expects that the application should be stopping. +type ApplicationStatusHandler struct{} + +// OnStatusChange is the handler called by the GRPC server code. +// +// It updates the status of the application and handles restarting the application is needed. +func (*ApplicationStatusHandler) OnStatusChange(state *server.ApplicationState, status proto.StateObserved_Status, msg string) { + app, ok := state.App().(*Application) + if !ok { + panic(errors.New("only *Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected)) + } + + app.appLock.Lock() + + // If the application is stopped, do not update the state. Stopped is a final state + // and should not be overridden. + if app.state.Status == pstate.Stopped { + app.appLock.Unlock() + return + } + + app.state.UpdateFromProto(status) + app.state.Message = msg + if status == proto.StateObserved_FAILED { + // ignore when expected state is stopping + if state.Expected() == proto.StateExpected_STOPPING { + app.appLock.Unlock() + return + } + + // it was a crash, report it async not to block + // process management with networking issues + go app.reportCrash(context.Background()) + + // kill the process + if app.state.ProcessInfo != nil { + _ = app.state.ProcessInfo.Process.Kill() + app.state.ProcessInfo = nil + } + ctx := app.startContext + tag := app.tag + app.appLock.Unlock() + + // it was marshalled to pass into the state, so unmarshall will always succeed + var cfg map[string]interface{} + _ = yaml.Unmarshal([]byte(state.Config()), &cfg) + + err := app.Start(ctx, tag, cfg) + if err != nil { + app.logger.Error(errors.New( + fmt.Sprintf("application '%s' failed to restart", app.id), + errors.TypeApplicationCrash, + errors.M(errors.MetaKeyAppName, app.name), + errors.M(errors.MetaKeyAppName, app.id))) + } + return + } + app.appLock.Unlock() +} diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/cmd_cred.go b/x-pack/elastic-agent/pkg/core/plugin/process/cmd_cred.go index ae254e998979..8b51e2d6265a 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/cmd_cred.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/cmd_cred.go @@ -7,6 +7,7 @@ package process import ( + "math" "os" "os/exec" "path/filepath" @@ -34,3 +35,7 @@ func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg return cmd } + +func isInt32(val int) bool { + return val >= 0 && val <= math.MaxInt32 +} diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/config.go b/x-pack/elastic-agent/pkg/core/plugin/process/config.go index e8a236b5de4f..72e8e466720f 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/config.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/config.go @@ -8,14 +8,8 @@ import "time" // Config for fine tuning new process type Config struct { - MinPortNumber int `yaml:"min_port" config:"min_port"` - MaxPortNumber int `yaml:"max_port" config:"max_port"` - SpawnTimeout time.Duration `yaml:"spawn_timeout" config:"spawn_timeout"` - - // Transport is one of `unix` or `tcp`. `unix` uses unix sockets and is not supported on windows. - // Windows falls back to `tcp` regardless of configuration. - // With invalid configuration fallback to `tcp` is used as well. - Transport string + SpawnTimeout time.Duration `yaml:"spawn_timeout" config:"spawn_timeout"` + StopTimeout time.Duration `yaml:"stop_timeout" config:"stop_timeout"` // TODO: cgroups and namespaces } @@ -23,8 +17,7 @@ type Config struct { // DefaultConfig creates a config with pre-set default values. func DefaultConfig() *Config { return &Config{ - MinPortNumber: 10000, - MaxPortNumber: 30000, - SpawnTimeout: 30 * time.Second, + SpawnTimeout: 30 * time.Second, + StopTimeout: 30 * time.Second, } } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/process.go b/x-pack/elastic-agent/pkg/core/plugin/process/process.go index 4cb9f7702319..81f1960bf8bb 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/process.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/process.go @@ -5,40 +5,14 @@ package process import ( - "crypto/rand" - "encoding/hex" "fmt" "io" - "math" - mrand "math/rand" - "net" "os" - "path/filepath" - "runtime" - "time" - - "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -const ( - // DefaultTimeout is timeout for starting a process, needs to be passed as a config - DefaultTimeout = 10 * time.Second - // MinPortNumberKey is a minimum port new process can get for newly created GRPC server - MinPortNumberKey = "MIN_PORT_NUMBER" - // MaxPortNumberKey is a maximum port new process can get for newly created GRPC server - MaxPortNumberKey = "MAX_PORT_NUMBER" - // DefaultMinPort is used when no configuration is provided - DefaultMinPort = 10000 - // DefaultMaxPort is used when no configuration is provided - DefaultMaxPort = 30000 - - transportUnix = "unix" - transportTCP = "tcp" -) - var ( // ErrProcessStartFailedTimeout is a failure of start due to timeout ErrProcessStartFailedTimeout = errors.New("process failed to start due to timeout") @@ -46,16 +20,9 @@ var ( // Info groups information about fresh new process type Info struct { - Address string PID int Process *os.Process -} - -// Creds contains information for securing a communication -type Creds struct { - CaCert []byte - PK []byte - Cert []byte + Stdin io.WriteCloser } // Start starts a new process @@ -63,21 +30,8 @@ type Creds struct { // - network address of child process // - process id // - error -func Start(logger *logger.Logger, path string, config *Config, uid, gid int, creds *Creds, arg ...string) (processInfo *Info, err error) { - // inject env - grpcAddress, err := getGrpcAddress(config) - if err != nil { - return nil, errors.New(err, "failed to acquire grpc address") - } - - logger.Infof("address assigned to the process '%s': '%s'", path, grpcAddress) - - env := []string{ - fmt.Sprintf("SERVER_ADDRESS=%s", grpcAddress), - } - - // create a command - cmd := getCmd(logger, path, env, uid, gid, arg...) +func Start(logger *logger.Logger, path string, config *Config, uid, gid int, arg ...string) (proc *Info, err error) { + cmd := getCmd(logger, path, []string{}, uid, gid, arg...) stdin, err := cmd.StdinPipe() if err != nil { return nil, err @@ -88,120 +42,9 @@ func Start(logger *logger.Logger, path string, config *Config, uid, gid int, cre return nil, errors.New(err, fmt.Sprintf("failed to start '%s'", path)) } - // push credentials - err = pushCredentials(stdin, creds) - return &Info{ PID: cmd.Process.Pid, Process: cmd.Process, - Address: grpcAddress, + Stdin: stdin, }, err } - -// Stop stops the process based on the process id -func Stop(logger *logger.Logger, pid int) error { - proc, err := os.FindProcess(pid) - if err != nil { - // Process not found (it is already killed) we treat as a success - return nil - } - - return proc.Signal(os.Interrupt) -} - -// Attach assumes caller knows all the details about the process -// so it just tries to attach to existing PID and returns Process -// itself for awaiter -func Attach(logger *logger.Logger, pid int) (*Info, error) { - proc, err := os.FindProcess(pid) - if err != nil { - // Process not found we are unable to attach - return nil, err - } - - // We are attaching to an existing process, - // address is already known to caller - return &Info{ - PID: proc.Pid, - Process: proc, - }, nil -} - -func getGrpcAddress(config *Config) (string, error) { - if config.Transport == transportUnix && runtime.GOOS != "windows" { - getGrpcUnixAddress() - } - - return getGrpcTCPAddress(config.MinPortNumber, config.MaxPortNumber) -} - -func getGrpcUnixAddress() (string, error) { - for i := 0; i <= 100; i++ { - name := randSocketName() - if fi, err := os.Stat(name); err != nil || fi == nil { - return name, nil - } - } - - return "", fmt.Errorf("free unix socket not found, retry limit reached") -} - -func getGrpcTCPAddress(minPort, maxPort int) (string, error) { - if minPort == 0 { - minPort = DefaultMinPort - } - - if maxPort == 0 { - maxPort = DefaultMaxPort - } - - jitter := (maxPort - minPort) / 3 - if jitter > 0 { - mrand.Seed(time.Now().UnixNano()) - minPort += mrand.Intn(jitter) - } - - for port := minPort; port <= maxPort; port++ { - desiredAddress := fmt.Sprintf("127.0.0.1:%d", port) - listener, _ := net.Listen("tcp", desiredAddress) - if listener != nil { - // we found available port - listener.Close() - return desiredAddress, nil - } - } - - return "", fmt.Errorf("port not found in range %d-%d", minPort, maxPort) -} - -func randSocketName() string { - randBytes := make([]byte, 10) - rand.Read(randBytes) - return filepath.Join(os.TempDir(), hex.EncodeToString(randBytes)+".sock") -} - -func isInt32(val int) bool { - return val >= 0 && val <= math.MaxInt32 -} - -func pushCredentials(w io.Writer, c *Creds) error { - if c == nil { - return nil - } - - credbytes, err := yaml.Marshal(c) - if err != nil { - return errors.New(err, "decoding credentials") - } - - _, err = w.Write(credbytes) - if err != nil { - return errors.New(err, "passing credentials failed") - } - - // this gives beat with grpc a bit of time to spin up a goroutine and start a server. - // should be ok until we come up with more clever solution. - // Issue: https://github.com/elastic/beats/v7/issues/15634 - <-time.After(1500 * time.Millisecond) - return nil -} diff --git a/x-pack/elastic-agent/pkg/core/plugin/server/server.go b/x-pack/elastic-agent/pkg/core/plugin/server/server.go deleted file mode 100644 index 7f15c22198d6..000000000000 --- a/x-pack/elastic-agent/pkg/core/plugin/server/server.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package server - -import ( - "crypto/tls" - "crypto/x509" - "fmt" - "io" - "io/ioutil" - "net" - "os" - - rpc "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "gopkg.in/yaml.v2" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" -) - -const ( - serverAddressKey = "SERVER_ADDRESS" -) - -// NewGrpcServer creates a server and pairs it with fleet. -// Reads secrets from provided reader, registers provided server -// and starts listening on negotiated address -func NewGrpcServer(secretsReader io.Reader, configServer grpc.ConfiguratorServer) error { - // get creds from agent - var cred *process.Creds - secrets, err := ioutil.ReadAll(secretsReader) - if err != nil { - return errors.New(err, "failed to retrieve secrets from provided input") - } - - err = yaml.Unmarshal(secrets, &cred) - if err != nil { - return errors.New(err, "failed to parse secrets from provided input") - } - - // setup grpc server - serverAddress, found := os.LookupEnv(serverAddressKey) - if !found { - return errors.New("server address not specified") - } - - pair, err := tls.X509KeyPair(cred.Cert, cred.PK) - if err != nil { - return errors.New(err, "failed to load x509 key-pair") - } - - // Create CA cert pool - certPool := x509.NewCertPool() - if ok := certPool.AppendCertsFromPEM(cred.CaCert); !ok { - errors.New("failed to append client certs") - } - - fmt.Printf("Listening at %s\n", serverAddress) - lis, err := net.Listen("tcp", serverAddress) - if err != nil { - return errors.New(err, - fmt.Sprintf("failed to start server: %v", serverAddress), - errors.TypeNetwork, - errors.M(errors.MetaKeyURI, serverAddress)) - } - - // Create the TLS credentials - serverCreds := credentials.NewTLS(&tls.Config{ - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: []tls.Certificate{pair}, - ClientCAs: certPool, - }) - - // Create the gRPC server with the credentials - srv := rpc.NewServer(rpc.Creds(serverCreds)) - - // Register the handler object - grpc.RegisterConfiguratorServer(srv, configServer) - - // Serve and Listen - if err := srv.Serve(lis); err != nil { - return errors.New(err, - fmt.Sprintf("grpc serve error: %s", serverAddress), - errors.TypeNetwork, - errors.M(errors.MetaKeyURI, serverAddress)) - } - - return nil -} diff --git a/x-pack/elastic-agent/pkg/core/plugin/state/state.go b/x-pack/elastic-agent/pkg/core/plugin/state/state.go index 0c6b35dbc913..a3a4419e6fb6 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/state/state.go +++ b/x-pack/elastic-agent/pkg/core/plugin/state/state.go @@ -4,7 +4,10 @@ package state -import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" +import ( + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) // Status describes the current status of the application process. type Status int @@ -12,9 +15,21 @@ type Status int const ( // Stopped is status describing not running application. Stopped Status = iota - // Running signals that application is currently running. + // Starting is status describing application is starting. + Starting + // Configuring is status describing application is configuring. + Configuring + // Running is status describing application is running. Running - // Restarting means process crashed and is being started again. + // Degraded is status describing application is degraded. + Degraded + // Failed is status describing application is failed. + Failed + // Stopping is status describing application is stopping. + Stopping + // Crashed is status describing application is crashed. + Crashed + // Restarting is status describing application is restarting. Restarting ) @@ -22,4 +37,23 @@ const ( type State struct { ProcessInfo *process.Info Status Status + Message string +} + +// UpdateFromProto updates the status from the status from the GRPC protocol. +func (s *State) UpdateFromProto(status proto.StateObserved_Status) { + switch status { + case proto.StateObserved_STARTING: + s.Status = Starting + case proto.StateObserved_CONFIGURING: + s.Status = Configuring + case proto.StateObserved_HEALTHY: + s.Status = Running + case proto.StateObserved_DEGRADED: + s.Status = Degraded + case proto.StateObserved_FAILED: + s.Status = Failed + case proto.StateObserved_STOPPING: + s.Status = Stopping + } } diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/config.go b/x-pack/elastic-agent/pkg/core/remoteconfig/config.go deleted file mode 100644 index 5e9b879d2997..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/config.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package remoteconfig - -import ( - "context" -) - -// Client for remote calls -type Client interface{} - -// ConfiguratorClient is the client connecting agent and a process -type ConfiguratorClient interface { - Config(ctx context.Context, config string) error - Close() error -} - -// ConnectionCreator describes a creator of connections. -// ConnectionCreator should be used in client vault to generate new connections. -type ConnectionCreator interface { - NewConnection(address ConnectionProvider) (Client, error) -} - -// ConnectionProvider is a basic provider everybody needs to implement -// in order to provide a valid connection. -// Minimal set of properties is: address -type ConnectionProvider interface { - Address() string -} diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/configclient.go b/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/configclient.go deleted file mode 100644 index 1ca7ba6b33e2..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/configclient.go +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package grpc - -import ( - context "context" - "errors" - "time" - - grpc "google.golang.org/grpc" - rpc "google.golang.org/grpc" - - "github.com/elastic/beats/v7/libbeat/common/backoff" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" -) - -var ( - // ErrNotGrpcClient is used when connection passed into a factory is not a grpc connection - ErrNotGrpcClient = errors.New("not a grpc client") - // ErrProviderNotProvided is used when provider passed into factory is not provided - ErrProviderNotProvided = errors.New("provider not provided") - // ErrProviderIncorrectType is used when provider passed into factory does not implement grpcConnectionProvided - ErrProviderIncorrectType = errors.New("provided provider has incorrect type") -) - -// CreateConfiguratorClient creates a new client from a connection passed in. -// This wraps generated grpc implementation so the change of the underlying -// technology is just the change of the namespace. -func CreateConfiguratorClient(conn interface{}, delay, maxDelay time.Duration) (remoteconfig.ConfiguratorClient, error) { - grpcConn, ok := conn.(*rpc.ClientConn) - if !ok { - return nil, ErrNotGrpcClient - } - - var boff backoff.Backoff - done := make(chan struct{}) - - if delay > 0 && maxDelay > 0 { - boff = backoff.NewEqualJitterBackoff(done, delay, maxDelay) - } else { - // no retry strategy configured - boff = NewNoopBackoff() - } - - return &client{ - grpcConn: grpcConn, - client: NewConfiguratorClient(grpcConn), - backoff: boff, - done: done, - }, nil -} - -type client struct { - grpcConn *grpc.ClientConn - client ConfiguratorClient - backoff backoff.Backoff - done chan struct{} -} - -// Config performs grpc Config request. -func (c *client) Config(ctx context.Context, config string) error { - request := ConfigRequest{ - Config: string(config), - } - - _, err := c.client.Config(ctx, &request) - backoff.WaitOnError(c.backoff, err) - - return err -} - -// Status performs grpc Status request. -func (c *client) Status(ctx context.Context) (string, error) { - request := StatusRequest{} - res, err := c.client.Status(ctx, &request) - if err != nil { - return "", err - } - - return res.Status, nil -} - -// Close cleans up resources. -func (c *client) Close() error { - close(c.done) - return c.grpcConn.Close() -} - -func (c *client) Backoff() backoff.Backoff { - return c.backoff -} diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/connection_provider.go b/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/connection_provider.go deleted file mode 100644 index f6bc612507d5..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/connection_provider.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package grpc - -import ( - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" -) - -var _ remoteconfig.ConnectionProvider = (*ConnectionProvider)(nil) -var _ grpcConnectionProvider = (*ConnectionProvider)(nil) - -// ConnectionProvider is a connection provider for grpc connections -type ConnectionProvider struct { - address string - caCrt []byte - clientPrivateKey []byte - clientCert []byte -} - -type grpcConnectionProvider interface { - remoteconfig.ConnectionProvider - CA() []byte - Cert() []byte - PK() []byte - IsSecured() bool -} - -// NewConnectionProvider creates a new connection provider for grpc connections -func NewConnectionProvider(address string, caCrt []byte, clientPrivateKey, clientCert []byte) *ConnectionProvider { - return &ConnectionProvider{ - address: address, - caCrt: caCrt, - clientPrivateKey: clientPrivateKey, - clientCert: clientCert, - } -} - -// Address returns an address used for connecting to a client -func (c *ConnectionProvider) Address() string { return c.address } - -// CA returns a certificate authority associated with a connection -func (c *ConnectionProvider) CA() []byte { return c.caCrt } - -// Cert returns a public certificate associated with a connection -func (c *ConnectionProvider) Cert() []byte { return c.clientCert } - -// PK returns a private key associated with a connection -func (c *ConnectionProvider) PK() []byte { return c.clientPrivateKey } - -// IsSecured returns true if all bits for setting up a secure connection were provided -func (c *ConnectionProvider) IsSecured() bool { - return c.caCrt != nil && c.clientCert != nil && c.clientPrivateKey != nil -} diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/factory.go b/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/factory.go deleted file mode 100644 index 62ddb418be99..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/factory.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package grpc - -import ( - "crypto/tls" - "crypto/x509" - "time" - - "google.golang.org/grpc/credentials" - - rpc "google.golang.org/grpc" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig" -) - -// NewConnFactory creates a factory used to create connection. Hides implementation details -// of the underlying connections. -func NewConnFactory(backoffDelay, backoffMaxDelay time.Duration) remoteconfig.ConnectionCreator { - return &connectionFactory{ - backoffDelay: backoffDelay, - backoffMaxDelay: backoffMaxDelay, - } -} - -type connectionFactory struct { - backoffDelay time.Duration - backoffMaxDelay time.Duration -} - -// NewConnection creates a connection -func (c *connectionFactory) NewConnection(provider remoteconfig.ConnectionProvider) (remoteconfig.Client, error) { - if provider == nil { - return nil, ErrProviderNotProvided - } - - grpcProvider, ok := provider.(grpcConnectionProvider) - if !ok { - return nil, ErrProviderIncorrectType - } - - if !grpcProvider.IsSecured() { - conn, err := rpc.Dial(provider.Address(), rpc.WithInsecure()) - if err != nil { - return nil, err - } - - return CreateConfiguratorClient(conn, c.backoffDelay, c.backoffMaxDelay) - } - - // Load client certificate - pair, err := tls.X509KeyPair(grpcProvider.Cert(), grpcProvider.PK()) - if err != nil { - return nil, errors.New(err, "creating client certificate pair") - } - - // Load Cert Auth - certPool := x509.NewCertPool() - if ok := certPool.AppendCertsFromPEM(grpcProvider.CA()); !ok { - return nil, errors.New("failed to append client certificate to CA pool") - } - - // Construct credentials - creds := credentials.NewTLS(&tls.Config{ - RootCAs: certPool, - Certificates: []tls.Certificate{pair}, - ServerName: "localhost", - }) - - conn, err := rpc.Dial(provider.Address(), rpc.WithTransportCredentials(creds)) - if err != nil { - return nil, err - } - - return CreateConfiguratorClient(conn, c.backoffDelay, c.backoffMaxDelay) -} diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/noop_backoff.go b/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/noop_backoff.go deleted file mode 100644 index 6599d106734a..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/noop_backoff.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package grpc - -import ( - "github.com/elastic/beats/v7/libbeat/common/backoff" -) - -// NoopBackoff implements a backoff interface without any wait. -// Used when no backoff is configured. -type NoopBackoff struct{} - -// NewNoopBackoff returns a new EqualJitter object. -func NewNoopBackoff() backoff.Backoff { - return &NoopBackoff{} -} - -// Reset resets the duration of the backoff. -func (b *NoopBackoff) Reset() {} - -// Wait block until either the timer is completed or channel is done. -func (b *NoopBackoff) Wait() bool { - return true -} diff --git a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/remote_config.pb.go b/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/remote_config.pb.go deleted file mode 100644 index 4048c8a0ee19..000000000000 --- a/x-pack/elastic-agent/pkg/core/remoteconfig/grpc/remote_config.pb.go +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: remote_config.proto - -package grpc - -import ( - context "context" - fmt "fmt" - math "math" - - proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type ConfigRequest struct { - Config string `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *ConfigRequest) Reset() { *m = ConfigRequest{} } -func (m *ConfigRequest) String() string { return proto.CompactTextString(m) } -func (*ConfigRequest) ProtoMessage() {} -func (*ConfigRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_16fc0d99571fe457, []int{0} -} - -func (m *ConfigRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ConfigRequest.Unmarshal(m, b) -} -func (m *ConfigRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ConfigRequest.Marshal(b, m, deterministic) -} -func (m *ConfigRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConfigRequest.Merge(m, src) -} -func (m *ConfigRequest) XXX_Size() int { - return xxx_messageInfo_ConfigRequest.Size(m) -} -func (m *ConfigRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ConfigRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_ConfigRequest proto.InternalMessageInfo - -func (m *ConfigRequest) GetConfig() string { - if m != nil { - return m.Config - } - return "" -} - -type ConfigResponse struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *ConfigResponse) Reset() { *m = ConfigResponse{} } -func (m *ConfigResponse) String() string { return proto.CompactTextString(m) } -func (*ConfigResponse) ProtoMessage() {} -func (*ConfigResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_16fc0d99571fe457, []int{1} -} - -func (m *ConfigResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ConfigResponse.Unmarshal(m, b) -} -func (m *ConfigResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ConfigResponse.Marshal(b, m, deterministic) -} -func (m *ConfigResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConfigResponse.Merge(m, src) -} -func (m *ConfigResponse) XXX_Size() int { - return xxx_messageInfo_ConfigResponse.Size(m) -} -func (m *ConfigResponse) XXX_DiscardUnknown() { - xxx_messageInfo_ConfigResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_ConfigResponse proto.InternalMessageInfo - -type StatusRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *StatusRequest) Reset() { *m = StatusRequest{} } -func (m *StatusRequest) String() string { return proto.CompactTextString(m) } -func (*StatusRequest) ProtoMessage() {} -func (*StatusRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_16fc0d99571fe457, []int{2} -} - -func (m *StatusRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_StatusRequest.Unmarshal(m, b) -} -func (m *StatusRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_StatusRequest.Marshal(b, m, deterministic) -} -func (m *StatusRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatusRequest.Merge(m, src) -} -func (m *StatusRequest) XXX_Size() int { - return xxx_messageInfo_StatusRequest.Size(m) -} -func (m *StatusRequest) XXX_DiscardUnknown() { - xxx_messageInfo_StatusRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_StatusRequest proto.InternalMessageInfo - -type StatusResponse struct { - Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"Status,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *StatusResponse) Reset() { *m = StatusResponse{} } -func (m *StatusResponse) String() string { return proto.CompactTextString(m) } -func (*StatusResponse) ProtoMessage() {} -func (*StatusResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_16fc0d99571fe457, []int{3} -} - -func (m *StatusResponse) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_StatusResponse.Unmarshal(m, b) -} -func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic) -} -func (m *StatusResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_StatusResponse.Merge(m, src) -} -func (m *StatusResponse) XXX_Size() int { - return xxx_messageInfo_StatusResponse.Size(m) -} -func (m *StatusResponse) XXX_DiscardUnknown() { - xxx_messageInfo_StatusResponse.DiscardUnknown(m) -} - -var xxx_messageInfo_StatusResponse proto.InternalMessageInfo - -func (m *StatusResponse) GetStatus() string { - if m != nil { - return m.Status - } - return "" -} - -func init() { - proto.RegisterType((*ConfigRequest)(nil), "remoteconfig.ConfigRequest") - proto.RegisterType((*ConfigResponse)(nil), "remoteconfig.ConfigResponse") - proto.RegisterType((*StatusRequest)(nil), "remoteconfig.StatusRequest") - proto.RegisterType((*StatusResponse)(nil), "remoteconfig.StatusResponse") -} - -func init() { proto.RegisterFile("remote_config.proto", fileDescriptor_16fc0d99571fe457) } - -var fileDescriptor_16fc0d99571fe457 = []byte{ - // 175 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x4a, 0xcd, 0xcd, - 0x2f, 0x49, 0x8d, 0x4f, 0xce, 0xcf, 0x4b, 0xcb, 0x4c, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, - 0xe2, 0x81, 0x08, 0x42, 0xc4, 0x94, 0xd4, 0xb9, 0x78, 0x9d, 0xc1, 0xac, 0xa0, 0xd4, 0xc2, 0xd2, - 0xd4, 0xe2, 0x12, 0x21, 0x31, 0x2e, 0x36, 0x88, 0x94, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x67, 0x10, - 0x94, 0xa7, 0x24, 0xc0, 0xc5, 0x07, 0x53, 0x58, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0xaa, 0xc4, 0xcf, - 0xc5, 0x1b, 0x5c, 0x92, 0x58, 0x52, 0x5a, 0x0c, 0xd5, 0xaa, 0xa4, 0xc1, 0xc5, 0x07, 0x13, 0x80, - 0x28, 0x01, 0x19, 0x06, 0x11, 0x81, 0x19, 0x06, 0xe1, 0x19, 0xcd, 0x61, 0xe4, 0xe2, 0x81, 0x98, - 0x56, 0x5a, 0x94, 0x58, 0x92, 0x5f, 0x24, 0xe4, 0xca, 0xc5, 0x06, 0xe1, 0x0b, 0x49, 0xeb, 0x21, - 0xbb, 0x4f, 0x0f, 0xc5, 0x71, 0x52, 0x32, 0xd8, 0x25, 0xa1, 0x0e, 0x62, 0x00, 0x19, 0x03, 0xb1, - 0x01, 0xdd, 0x18, 0x14, 0x87, 0xa2, 0x1b, 0x83, 0xea, 0x68, 0x25, 0x86, 0x24, 0x36, 0x70, 0x48, - 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x06, 0x93, 0xe1, 0x10, 0x40, 0x01, 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// ConfiguratorClient is the client API for Configurator service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type ConfiguratorClient interface { - Config(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) - Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) -} - -type configuratorClient struct { - cc *grpc.ClientConn -} - -func NewConfiguratorClient(cc *grpc.ClientConn) ConfiguratorClient { - return &configuratorClient{cc} -} - -func (c *configuratorClient) Config(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) { - out := new(ConfigResponse) - err := c.cc.Invoke(ctx, "/remoteconfig.Configurator/Config", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *configuratorClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { - out := new(StatusResponse) - err := c.cc.Invoke(ctx, "/remoteconfig.Configurator/Status", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// ConfiguratorServer is the server API for Configurator service. -type ConfiguratorServer interface { - Config(context.Context, *ConfigRequest) (*ConfigResponse, error) - Status(context.Context, *StatusRequest) (*StatusResponse, error) -} - -// UnimplementedConfiguratorServer can be embedded to have forward compatible implementations. -type UnimplementedConfiguratorServer struct { -} - -func (*UnimplementedConfiguratorServer) Config(ctx context.Context, req *ConfigRequest) (*ConfigResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Config not implemented") -} -func (*UnimplementedConfiguratorServer) Status(ctx context.Context, req *StatusRequest) (*StatusResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") -} - -func RegisterConfiguratorServer(s *grpc.Server, srv ConfiguratorServer) { - s.RegisterService(&_Configurator_serviceDesc, srv) -} - -func _Configurator_Config_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ConfigRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ConfiguratorServer).Config(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/remoteconfig.Configurator/Config", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ConfiguratorServer).Config(ctx, req.(*ConfigRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Configurator_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StatusRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(ConfiguratorServer).Status(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/remoteconfig.Configurator/Status", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ConfiguratorServer).Status(ctx, req.(*StatusRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Configurator_serviceDesc = grpc.ServiceDesc{ - ServiceName: "remoteconfig.Configurator", - HandlerType: (*ConfiguratorServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Config", - Handler: _Configurator_Config_Handler, - }, - { - MethodName: "Status", - Handler: _Configurator_Status_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "remote_config.proto", -} diff --git a/x-pack/elastic-agent/pkg/core/server/config.go b/x-pack/elastic-agent/pkg/core/server/config.go new file mode 100644 index 000000000000..46253431a452 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/server/config.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +// Config is a configuration of GRPC server. +type Config struct { + Address string `config:"address"` + Port uint16 `config:"port"` +} + +// DefaultGRPCConfig creates a default server configuration. +func DefaultGRPCConfig() *Config { + return &Config{ + Address: "localhost", + Port: 6789, + } +} + +// NewFromConfig creates a new GRPC server for clients to connect to. +func NewFromConfig(logger *logger.Logger, cfg *config.Config, handler Handler) (*Server, error) { + wrappedConfig := &struct { + GRPC *Config `config:"grpc"` + }{ + GRPC: DefaultGRPCConfig(), + } + + if err := cfg.Unpack(&wrappedConfig); err != nil { + return nil, err + } + + return New(logger, fmt.Sprintf("%s:%d", wrappedConfig.GRPC.Address, wrappedConfig.GRPC.Port), handler) +} diff --git a/x-pack/elastic-agent/pkg/core/server/config_test.go b/x-pack/elastic-agent/pkg/core/server/config_test.go new file mode 100644 index 000000000000..9f659319cc86 --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/server/config_test.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" +) + +func TestNewFromConfig(t *testing.T) { + cfg := config.MustNewConfigFrom(map[string]interface{}{ + "grpc": map[string]interface{}{ + "address": "0.0.0.0", + "port": 9876, + }, + }) + l := newErrorLogger(t) + srv, err := NewFromConfig(l, cfg, &StubHandler{}) + require.NoError(t, err) + assert.Equal(t, "0.0.0.0:9876", srv.getListenAddr()) +} diff --git a/x-pack/elastic-agent/pkg/core/server/server.go b/x-pack/elastic-agent/pkg/core/server/server.go index 498b1b8b161b..38db57234b65 100644 --- a/x-pack/elastic-agent/pkg/core/server/server.go +++ b/x-pack/elastic-agent/pkg/core/server/server.go @@ -93,7 +93,6 @@ type Handler interface { // Server is the GRPC server that the launched applications connect back to. type Server struct { - lock sync.RWMutex logger *logger.Logger ca *authority.CertificateAuthority listenAddr string @@ -104,7 +103,7 @@ type Server struct { watchdogDone chan bool watchdogWG sync.WaitGroup - apps map[string]*ApplicationState + apps sync.Map // overridden in tests watchdogCheckInterval time.Duration @@ -122,7 +121,6 @@ func New(logger *logger.Logger, listenAddr string, handler Handler) (*Server, er ca: ca, listenAddr: listenAddr, handler: handler, - apps: make(map[string]*ApplicationState), watchdogCheckInterval: WatchdogCheckLoop, checkInMinTimeout: client.CheckinMinimumTimeout + CheckinMinimumTimeoutGracePeriod, }, nil @@ -176,14 +174,16 @@ func (s *Server) Stop() { // Get returns the application state from the server for the passed application. func (s *Server) Get(app interface{}) (*ApplicationState, bool) { - s.lock.RLock() - defer s.lock.RUnlock() - for _, appState := range s.apps { - if appState.app == app { - return appState, true + var foundState *ApplicationState + s.apps.Range(func(_ interface{}, val interface{}) bool { + as := val.(*ApplicationState) + if as.app == app { + foundState = as + return false } - } - return nil, false + return true + }) + return foundState, foundState != nil } // Register registers a new application to connect to the server. @@ -222,9 +222,7 @@ func (s *Server) Register(app interface{}, config string) (*ApplicationState, er sentActions: make(map[string]*sentAction), actionsConn: true, } - s.lock.Lock() - defer s.lock.Unlock() - s.apps[appState.token] = appState + s.apps.Store(appState.token, appState) return appState, nil } @@ -559,9 +557,7 @@ func (as *ApplicationState) Stop(timeout time.Duration) error { func (as *ApplicationState) Destroy() { as.destroyActionsStream() as.destroyCheckinStream() - as.srv.lock.Lock() - delete(as.srv.apps, as.token) - as.srv.lock.Unlock() + as.srv.apps.Delete(as.token) } // UpdateConfig pushes an updated configuration to the connected application. @@ -625,6 +621,49 @@ func (as *ApplicationState) PerformAction(name string, params map[string]interfa return res.result, res.err } +// App returns the registered app for the state. +func (as *ApplicationState) App() interface{} { + return as.app +} + +// Expected returns the expected state of the process. +func (as *ApplicationState) Expected() proto.StateExpected_State { + as.checkinLock.RLock() + defer as.checkinLock.RUnlock() + return as.expected +} + +// Config returns the expected config of the process. +func (as *ApplicationState) Config() string { + as.checkinLock.RLock() + defer as.checkinLock.RUnlock() + return as.expectedConfig +} + +// Status returns the current observed status. +func (as *ApplicationState) Status() (proto.StateObserved_Status, string) { + as.checkinLock.RLock() + defer as.checkinLock.RUnlock() + return as.status, as.statusMessage +} + +// SetStatus allows the status to be overwritten by the agent. +// +// This status will be overwritten by the client if it reconnects and updates it status. +func (as *ApplicationState) SetStatus(status proto.StateObserved_Status, msg string) { + as.checkinLock.RLock() + prevStatus := as.status + prevMessage := as.statusMessage + as.status = status + as.statusMessage = msg + as.checkinLock.RUnlock() + + // alert the service handler that status has changed for the application + if prevStatus != status || prevMessage != msg { + as.srv.handler.OnStatusChange(as, status, msg) + } +} + // updateStatus updates the current observed status from the application, sends the expected state back to the // application if the server expects it to be different then its observed state, and alerts the handler on the // server when the application status has changed. @@ -767,9 +806,9 @@ func (s *Server) watchdog() { case <-time.After(s.watchdogCheckInterval): } - s.lock.RLock() now := time.Now().UTC() - for _, serverApp := range s.apps { + s.apps.Range(func(_ interface{}, val interface{}) bool { + serverApp := val.(*ApplicationState) serverApp.checkinLock.RLock() statusTime := serverApp.statusTime serverApp.checkinLock.RUnlock() @@ -798,27 +837,33 @@ func (s *Server) watchdog() { } } serverApp.flushExpiredActions() - } - s.lock.RUnlock() + return true + }) } } // getByToken returns an application state by its token. func (s *Server) getByToken(token string) (*ApplicationState, bool) { - s.lock.RLock() - defer s.lock.RUnlock() - a, ok := s.apps[token] - return a, ok + val, ok := s.apps.Load(token) + if ok { + return val.(*ApplicationState), true + } + return nil, false } // getCertificate returns the TLS certificate based on the clientHello or errors if not found. func (s *Server) getCertificate(chi *tls.ClientHelloInfo) (*tls.Certificate, error) { - s.lock.RLock() - defer s.lock.RUnlock() - for _, app := range s.apps { - if app.srvName == chi.ServerName { - return app.cert.Certificate, nil + var cert *tls.Certificate + s.apps.Range(func(_ interface{}, val interface{}) bool { + sa := val.(*ApplicationState) + if sa.srvName == chi.ServerName { + cert = sa.cert.Certificate + return false } + return true + }) + if cert != nil { + return cert, nil } return nil, errors.New("no supported TLS certificate", errors.TypeSecurity) } diff --git a/x-pack/elastic-agent/proto/remote_config.proto b/x-pack/elastic-agent/proto/remote_config.proto deleted file mode 100644 index 7b151a9205fe..000000000000 --- a/x-pack/elastic-agent/proto/remote_config.proto +++ /dev/null @@ -1,22 +0,0 @@ - -syntax = "proto3"; -package remoteconfig; - -message ConfigRequest { - string config = 1; -} - -message ConfigResponse { -} - -message StatusRequest{ -} - -message StatusResponse { - string Status = 1; -} - -service Configurator { - rpc Config(ConfigRequest) returns (ConfigResponse) {} - rpc Status(StatusRequest) returns (StatusResponse) {} -} diff --git a/x-pack/libbeat/management/fleet/config_server.go b/x-pack/libbeat/management/fleet/config_server.go deleted file mode 100644 index 5a74d50c981f..000000000000 --- a/x-pack/libbeat/management/fleet/config_server.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package fleet - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" -) - -const ( - defaultTimeout = 15 * time.Second -) - -// Server is a server for handling communication between -// beat and Elastic Agent. -type Server struct { - configChan chan<- map[string]interface{} -} - -// NewConfigServer creates a new grpc configuration server for receiving -// configurations from Elastic Agent. -func NewConfigServer(configChan chan<- map[string]interface{}) *Server { - return &Server{ - configChan: configChan, - } -} - -// Config is a handler of a call made by agent pushing latest configuration. -func (s *Server) Config(ctx context.Context, req *grpc.ConfigRequest) (*grpc.ConfigResponse, error) { - cfgString := req.GetConfig() - - var configMap common.MapStr - uconfig, err := common.NewConfigFrom(cfgString) - if err != nil { - return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err) - } - - err = uconfig.Unpack(&configMap) - if err != nil { - return &grpc.ConfigResponse{}, fmt.Errorf("config blocks unsuccessfully generated: %+v", err) - } - - select { - case s.configChan <- configMap: - case <-time.After(defaultTimeout): - return &grpc.ConfigResponse{}, errors.New("failed to push configuration: Timeout") - } - return &grpc.ConfigResponse{}, nil -} - -// Status returns OK. -func (s *Server) Status(ctx context.Context, req *grpc.StatusRequest) (*grpc.StatusResponse, error) { - return &grpc.StatusResponse{Status: "ok"}, nil -} diff --git a/x-pack/libbeat/management/fleet/manager.go b/x-pack/libbeat/management/fleet/manager.go index 483b697eda38..55903480f069 100644 --- a/x-pack/libbeat/management/fleet/manager.go +++ b/x-pack/libbeat/management/fleet/manager.go @@ -5,43 +5,38 @@ package fleet import ( + "context" "fmt" "os" "sort" - "sync" "github.com/gofrs/uuid" "github.com/pkg/errors" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/server" "github.com/elastic/beats/v7/x-pack/libbeat/management/api" xmanagement "github.com/elastic/beats/v7/x-pack/libbeat/management" ) -// ConfigManager provides a functionality to retrieve config channel -// using which manager is informed about config changes. -type ConfigManager interface { - ConfigChan() chan<- map[string]interface{} -} - // Manager handles internal config updates. By retrieving // new configs from Kibana and applying them to the Beat. type Manager struct { config *Config logger *logp.Logger beatUUID uuid.UUID - done chan struct{} registry *reload.Registry - wg sync.WaitGroup blacklist *xmanagement.ConfigBlacklist + client *client.Client - configChan chan map[string]interface{} + stopFunc func() } // NewFleetManager returns a X-Pack Beats Fleet Management manager. @@ -57,32 +52,34 @@ func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID // NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager. func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { - var blacklist *xmanagement.ConfigBlacklist + log := logp.NewLogger(management.DebugK) - if c.Enabled && c.Mode == xmanagement.ModeFleet { - var err error + m := &Manager{ + config: c, + logger: log.Named("fleet"), + beatUUID: beatUUID, + registry: registry, + } + var err error + var blacklist *xmanagement.ConfigBlacklist + var eac *client.Client + if c.Enabled && c.Mode == xmanagement.ModeFleet { // Initialize configs blacklist blacklist, err = xmanagement.NewConfigBlacklist(c.Blacklist) if err != nil { return nil, errors.Wrap(err, "wrong settings for configurations blacklist") } - } - - log := logp.NewLogger(management.DebugK) - m := &Manager{ - config: c, - blacklist: blacklist, - logger: log.Named("fleet"), - done: make(chan struct{}), - beatUUID: beatUUID, - registry: registry, - configChan: make(chan map[string]interface{}), + // Initialize the client + eac, err = client.NewFromReader(os.Stdin, m) + if err != nil { + return nil, errors.Wrap(err, "failed to create elastic-agent-client") + } } - go m.startGrpcServer() - + m.blacklist = blacklist + m.client = eac return m, nil } @@ -91,13 +88,8 @@ func (cm *Manager) Enabled() bool { return cm.config.Enabled && cm.config.Mode == xmanagement.ModeFleet } -// ConfigChan returns a channel used to communicate configuration changes. -func (cm *Manager) ConfigChan() chan<- map[string]interface{} { - return cm.configChan -} - // Start the config manager -func (cm *Manager) Start() { +func (cm *Manager) Start(stopFunc func()) { if !cm.Enabled() { return } @@ -105,8 +97,11 @@ func (cm *Manager) Start() { cfgwarn.Beta("Fleet management is enabled") cm.logger.Info("Starting fleet management service") - cm.wg.Add(1) - go cm.worker() + cm.stopFunc = stopFunc + err := cm.client.Start(context.Background()) + if err != nil { + cm.logger.Errorf("failed to start elastic-agent-client: %s", err) + } } // Stop the config manager @@ -115,10 +110,8 @@ func (cm *Manager) Stop() { return } - // stop collecting configuration cm.logger.Info("Stopping fleet management service") - close(cm.done) - cm.wg.Wait() + cm.client.Stop() } // CheckRawConfig check settings are correct to start the beat. This method @@ -129,30 +122,55 @@ func (cm *Manager) CheckRawConfig(cfg *common.Config) error { return nil } -func (cm *Manager) worker() { - defer cm.wg.Done() +func (cm *Manager) OnConfig(s string) { + cm.client.Status(proto.StateObserved_CONFIGURING, "Updating configuration") - // Start worker loop: fetch + apply new settings -WORKERLOOP: - for { - select { - case cfg := <-cm.configChan: - blocks, err := cm.toConfigBlocks(cfg) - if err != nil { - cm.logger.Errorf("Could not apply the configuration, error: %+v", err) - continue WORKERLOOP - } + var configMap common.MapStr + uconfig, err := common.NewConfigFrom(s) + if err != nil { + err = errors.Wrap(err, "config blocks unsuccessfully generated") + cm.logger.Error(err) + cm.client.Status(proto.StateObserved_FAILED, err.Error()) + return + } - if errs := cm.apply(blocks); !errs.IsEmpty() { - cm.logger.Errorf("Could not apply the configuration, error: %+v", errs) - continue WORKERLOOP - } - case <-cm.done: - return - } + err = uconfig.Unpack(&configMap) + if err != nil { + err = errors.Wrap(err, "config blocks unsuccessfully generated") + cm.logger.Error(err) + cm.client.Status(proto.StateObserved_FAILED, err.Error()) + return + } + + blocks, err := cm.toConfigBlocks(configMap) + if err != nil { + err = errors.Wrap(err, "could not apply the configuration") + cm.logger.Error(err) + cm.client.Status(proto.StateObserved_FAILED, err.Error()) + return + } + + if errs := cm.apply(blocks); !errs.IsEmpty() { + err = errors.Wrap(err, "could not apply the configuration") + cm.logger.Error(err) + cm.client.Status(proto.StateObserved_FAILED, err.Error()) + return + } + + cm.client.Status(proto.StateObserved_HEALTHY, "Running") +} + +func (cm *Manager) OnStop() { + if cm.stopFunc != nil { + cm.client.Status(proto.StateObserved_STOPPING, "Stopping") + cm.stopFunc() } } +func (cm *Manager) OnError(err error) { + cm.logger.Errorf("elastic-agent-client got error: %s", err) +} + func (cm *Manager) apply(blocks api.ConfigBlocks) xmanagement.Errors { var errors xmanagement.Errors missing := map[string]bool{} @@ -267,13 +285,3 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) { return res, nil } - -func (cm *Manager) startGrpcServer() { - cm.logger.Info("initiating fleet config manager") - s := NewConfigServer(cm.ConfigChan()) - if err := server.NewGrpcServer(os.Stdin, s); err != nil { - panic(err) - } -} - -var _ ConfigManager = &Manager{} diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index 74770ef424ae..b6b75b373acb 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -116,7 +116,7 @@ func (cm *ConfigManager) Enabled() bool { } // Start the config manager -func (cm *ConfigManager) Start() { +func (cm *ConfigManager) Start(_ func()) { if !cm.Enabled() { return } diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go index af0c8816ebe8..c783989b7082 100644 --- a/x-pack/libbeat/management/manager_test.go +++ b/x-pack/libbeat/management/manager_test.go @@ -88,7 +88,7 @@ func TestConfigManager(t *testing.T) { t.Fatal(err) } - manager.Start() + manager.Start(func() {}) // On first reload we will get apache2 module config1 := <-reloadable.reloaded @@ -164,7 +164,7 @@ func TestRemoveItems(t *testing.T) { t.Fatal(err) } - manager.Start() + manager.Start(func() {}) // On first reload we will get apache2 module config1 := <-reloadable.reloaded @@ -242,7 +242,7 @@ func TestUnEnroll(t *testing.T) { t.Fatal(err) } - manager.Start() + manager.Start(func() {}) // On first reload we will get apache2 module config1 := <-reloadable.reloaded @@ -322,7 +322,7 @@ func TestBadConfig(t *testing.T) { t.Fatal(err) } - manager.Start() + manager.Start(func() {}) // On first reload we will get apache2 module config1 := <-reloadable.reloaded