diff --git a/NOTICE.txt b/NOTICE.txt index f23805c5d87..7bc5103d040 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -602,6 +602,218 @@ Contents of probable licence file $GOMODCACHE/github.com/docker/go-units@v0.4.0/ limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/dolmen-go/contextio +Version: v0.0.0-20200217195037-68fc5150bcd5 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/dolmen-go/contextio@v0.0.0-20200217195037-68fc5150bcd5/LICENSE: + + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/e2e-testing Version: v1.99.2-0.20220117192005-d3365c99b9c4 @@ -1695,11 +1907,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-licenser@v0. -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-sysinfo -Version: v1.7.1 +Version: v1.8.1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.7.1/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.8.1/LICENSE.txt: Apache License @@ -1907,11 +2119,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/go-sysinfo@v1.7 -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-ucfg -Version: v0.8.5 +Version: v0.8.6 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.5/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-ucfg@v0.8.6/LICENSE: Apache License Version 2.0, January 2004 @@ -5230,11 +5442,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : golang.org/x/sys -Version: v0.0.0-20220422013727-9388b58f7150 +Version: v0.0.0-20220715151400-c0bba94af5f8 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.0.0-20220422013727-9388b58f7150/LICENSE: +Contents of probable licence file $GOMODCACHE/golang.org/x/sys@v0.0.0-20220715151400-c0bba94af5f8/LICENSE: Copyright (c) 2009 The Go Authors. All rights reserved. diff --git a/changelog/fragments/1666095433-service_runtime_v2.yaml b/changelog/fragments/1666095433-service_runtime_v2.yaml new file mode 100644 index 00000000000..f54aa7e5d9c --- /dev/null +++ b/changelog/fragments/1666095433-service_runtime_v2.yaml @@ -0,0 +1,5 @@ +kind: feature +summary: Service runtime for V2 +description: Service runtime for V2, tailored specifically for Endpoint service. +pr: 1529 +issue: 1069 diff --git a/go.mod b/go.mod index 245d331130f..df1845dff01 100644 --- a/go.mod +++ b/go.mod @@ -10,14 +10,15 @@ require ( github.com/cavaliercoder/go-rpm v0.0.0-20190131055624-7a9c54e3d83e github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534 github.com/docker/go-units v0.4.0 + github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 github.com/elastic/e2e-testing v1.99.2-0.20220117192005-d3365c99b9c4 github.com/elastic/elastic-agent-autodiscover v0.2.1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20220804181728-b0328d2fe484 github.com/elastic/elastic-agent-libs v0.2.6 github.com/elastic/elastic-agent-system-metrics v0.4.4 github.com/elastic/go-licenser v0.4.0 - github.com/elastic/go-sysinfo v1.7.1 - github.com/elastic/go-ucfg v0.8.5 + github.com/elastic/go-sysinfo v1.8.1 + github.com/elastic/go-ucfg v0.8.6 github.com/gofrs/flock v0.8.1 github.com/gofrs/uuid v4.2.0+incompatible github.com/google/go-cmp v0.5.6 @@ -47,7 +48,7 @@ require ( golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 golang.org/x/tools v0.1.9 google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 @@ -152,6 +153,7 @@ require ( replace ( github.com/Microsoft/go-winio => github.com/bi-zone/go-winio v0.4.15 + github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3 github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c diff --git a/go.sum b/go.sum index f8fb1ecc1a7..73ded2d2cf3 100644 --- a/go.sum +++ b/go.sum @@ -374,6 +374,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= +github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5 h1:BzN9o4IS1Hj+AM5qDggsfMDQGFXau5KagipEFmnyIbc= +github.com/dolmen-go/contextio v0.0.0-20200217195037-68fc5150bcd5/go.mod h1:cxc20xI7fOgsFHWgt+PenlDDnMcrvh7Ocuj5hEFIdEk= github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5/go.mod h1:qssHWj60/X5sZFNxpG4HBPDHVqxNm4DfnCKgrbZOT+s= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= @@ -398,11 +400,13 @@ github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tld github.com/elastic/go-structform v0.0.9 h1:HpcS7xljL4kSyUfDJ8cXTJC6rU5ChL1wYb6cx3HLD+o= github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= -github.com/elastic/go-sysinfo v1.7.1 h1:Wx4DSARcKLllpKT2TnFVdSUJOsybqMYCNQZq1/wO+s0= github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= +github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= +github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= github.com/elastic/go-ucfg v0.8.4/go.mod h1:4E8mPOLSUV9hQ7sgLEJ4bvt0KhMuDJa8joDT2QGAEKA= -github.com/elastic/go-ucfg v0.8.5 h1:4GB/rMpuh7qTcSFaxJUk97a/JyvFzhi6t+kaskTTLdM= github.com/elastic/go-ucfg v0.8.5/go.mod h1:4E8mPOLSUV9hQ7sgLEJ4bvt0KhMuDJa8joDT2QGAEKA= +github.com/elastic/go-ucfg v0.8.6 h1:stUeyh2goTgGX+/wb9gzKvTv0YB0231LTpKUgCKj4U0= +github.com/elastic/go-ucfg v0.8.6/go.mod h1:4E8mPOLSUV9hQ7sgLEJ4bvt0KhMuDJa8joDT2QGAEKA= github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= @@ -1567,8 +1571,8 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index 327138ac67a..75435cf8e45 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -104,7 +104,9 @@ func New( log.Info("Parsed configuration and determined agent is managed by Fleet") composableManaged = true - compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server)) + compModifiers = append(compModifiers, FleetServerComponentModifier(cfg.Fleet.Server), + EndpointComponentModifier(cfg.Fleet)) + managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime) if err != nil { return nil, err @@ -130,6 +132,7 @@ func New( func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) { path := paths.AgentConfigFile() store := storage.NewEncryptedDiskStore(path) + reader, err := store.Load() if err != nil { return store, nil, errors.New(err, "could not initialize config store", @@ -161,6 +164,11 @@ func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.C errors.M(errors.MetaKeyPath, path)) } + // Fix up fleet.agent.id otherwise the fleet.agent.id is empty string + if cfg.Settings != nil && cfg.Fleet != nil && cfg.Fleet.Info != nil && cfg.Fleet.Info.ID == "" { + cfg.Fleet.Info.ID = cfg.Settings.ID + } + if err := cfg.Fleet.Valid(); err != nil { return store, nil, errors.New(err, "fleet configuration is invalid", diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 6c68e5cf610..8fd41b27b7f 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -137,7 +137,7 @@ type VarsManager interface { // ComponentsModifier is a function that takes the computed components model and modifies it before // passing it into the components runtime manager. -type ComponentsModifier func(comps []component.Component) ([]component.Component, error) +type ComponentsModifier func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) // State provides the current state of the coordinator along with all the current states of components and units. type State struct { @@ -670,7 +670,7 @@ func (c *Coordinator) compute() (map[string]interface{}, []component.Component, } for _, modifier := range c.modifiers { - comps, err = modifier(comps) + comps, err = modifier(comps, cfg) if err != nil { return nil, nil, fmt.Errorf("failed to modify components: %w", err) } diff --git a/internal/pkg/agent/application/fleet_server_bootstrap.go b/internal/pkg/agent/application/fleet_server_bootstrap.go index 369c63bf53b..87d9febaf87 100644 --- a/internal/pkg/agent/application/fleet_server_bootstrap.go +++ b/internal/pkg/agent/application/fleet_server_bootstrap.go @@ -21,6 +21,7 @@ import ( const ( elasticsearch = "elasticsearch" fleetServer = "fleet-server" + endpoint = "endpoint" ) // injectFleetServerInput is the base configuration that is used plus the FleetServerComponentModifier that adjusts @@ -43,7 +44,7 @@ var injectFleetServerInput = config.MustNewConfigFrom(map[string]interface{}{ // FleetServerComponentModifier modifies the comps to inject extra information from the policy into // the Fleet Server component and units needed to run Fleet Server correctly. func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) coordinator.ComponentsModifier { - return func(comps []component.Component) ([]component.Component, error) { + return func(comps []component.Component, _ map[string]interface{}) ([]component.Component, error) { for i, comp := range comps { if comp.Spec.InputType == fleetServer { for j, unit := range comp.Units { @@ -82,6 +83,44 @@ func FleetServerComponentModifier(serverCfg *configuration.FleetServerConfig) co } } +// EndpointComponentModifier the modifier for the Endpoint configuration. +// The Endpoint expects the fleet configuration passed to it by the Agent +// because it needs to be able to connect to the fleet server directly. +func EndpointComponentModifier(fleetCfg *configuration.FleetAgentConfig) coordinator.ComponentsModifier { + return func(comps []component.Component, cfg map[string]interface{}) ([]component.Component, error) { + for i, comp := range comps { + if comp.Spec.InputType == endpoint { + for j, unit := range comp.Units { + if unit.Type == client.UnitTypeInput && unit.Config.Type == endpoint { + unitCfgMap, err := toMapStr(unit.Config.Source.AsMap(), map[string]interface{}{"fleet": fleetCfg}) + if err != nil { + return nil, err + } + // Set host.id for the host, assign the host from the top level config + // Endpoint expects this + // "host": { + // "id": "b62e91be682a4108bbb080152cc5eeac" + // }, + if v, ok := unitCfgMap["fleet"]; ok { + if m, ok := v.(map[string]interface{}); ok { + m["host"] = cfg["host"] + } + } + unitCfg, err := component.ExpectedConfig(unitCfgMap) + if err != nil { + return nil, err + } + unit.Config = unitCfg + } + comp.Units[j] = unit + } + } + comps[i] = comp + } + return comps, nil + } +} + type fleetServerBootstrapManager struct { log *logger.Logger diff --git a/internal/pkg/agent/application/info/inject_config.go b/internal/pkg/agent/application/info/inject_config.go index ced41d9f217..03f57a6ddcd 100644 --- a/internal/pkg/agent/application/info/inject_config.go +++ b/internal/pkg/agent/application/info/inject_config.go @@ -42,6 +42,9 @@ func agentGlobalConfig() (map[string]interface{}, error) { "home": paths.Home(), "logs": paths.Logs(), }, + "host": map[string]interface{}{ + "id": hostInfo.Info().UniqueID, + }, "runtime.os": runtime.GOOS, "runtime.arch": runtime.GOARCH, "runtime.osinfo.type": hostInfo.Info().OS.Type, diff --git a/internal/pkg/agent/cmd/container.go b/internal/pkg/agent/cmd/container.go index baa21918695..06fd9bdf962 100644 --- a/internal/pkg/agent/cmd/container.go +++ b/internal/pkg/agent/cmd/container.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/url" "os" "os/exec" @@ -231,12 +230,12 @@ func containerCmd(streams *cli.IOStreams) error { wg.Done() // sending kill signal to current process (elastic-agent) logInfo(streams, "Initiate shutdown elastic-agent.") - mainProc.Signal(syscall.SIGTERM) // nolint:errcheck //not required + mainProc.Signal(syscall.SIGTERM) //nolint:errcheck //not required }() defer func() { if apmProc != nil { - apmProc.Stop() // nolint:errcheck //not required + apmProc.Stop() //nolint:errcheck //not required logInfo(streams, "Initiate shutdown legacy apm-server.") } }() @@ -722,7 +721,7 @@ func runLegacyAPMServer(streams *cli.IOStreams, path string) (*process.Info, err } // Get the apm-server directory - files, err := ioutil.ReadDir(path) + files, err := os.ReadDir(path) if err != nil { return nil, errors.New(err, fmt.Sprintf("reading directory %s", path)) } @@ -750,7 +749,7 @@ func runLegacyAPMServer(streams *cli.IOStreams, path string) (*process.Info, err addEnv("--httpprof", "HTTPPROF") addSettingEnv("gc_percent", "APMSERVER_GOGC") logInfo(streams, "Starting legacy apm-server daemon as a subprocess.") - return process.Start(spec.BinaryPath, os.Geteuid(), os.Getegid(), args, nil) + return process.Start(spec.BinaryPath, process.WithArgs(args)) } func logToStderr(cfg *configuration.Configuration) { diff --git a/internal/pkg/agent/cmd/enroll_cmd.go b/internal/pkg/agent/cmd/enroll_cmd.go index 77519772fe7..805b8a47757 100644 --- a/internal/pkg/agent/cmd/enroll_cmd.go +++ b/internal/pkg/agent/cmd/enroll_cmd.go @@ -9,7 +9,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "math/rand" "os" "os/exec" @@ -287,7 +286,7 @@ func (c *enrollCmd) writeDelayEnroll(streams *cli.IOStreams) error { errors.TypeConfig, errors.M("path", enrollPath)) } - err = ioutil.WriteFile(enrollPath, data, 0600) + err = os.WriteFile(enrollPath, data, 0600) if err != nil { return errors.New( err, @@ -598,12 +597,15 @@ func (c *enrollCmd) startAgent(ctx context.Context) (<-chan *os.ProcessState, er if !paths.IsVersionHome() { args = append(args, "--path.home.unversioned") } - proc, err := process.StartContext( - ctx, cmd, os.Geteuid(), os.Getegid(), args, nil, func(c *exec.Cmd) error { + proc, err := process.Start( + cmd, + process.WithContext(ctx), + process.WithArgs(args), + process.WithCmdOptions(func(c *exec.Cmd) error { c.Stdout = os.Stdout c.Stderr = os.Stderr return nil - }) + })) if err != nil { return nil, err } @@ -632,7 +634,7 @@ func yamlToReader(in interface{}) (io.Reader, error) { } func delay(ctx context.Context, d time.Duration) { - t := time.NewTimer(time.Duration(rand.Int63n(int64(d)))) //nolint:gosec // the RNG is allowed to be weak + t := time.NewTimer(time.Duration(rand.Int63n(int64(d)))) defer t.Stop() select { case <-ctx.Done(): diff --git a/internal/pkg/agent/configuration/settings.go b/internal/pkg/agent/configuration/settings.go index 3b509270344..878f4cbfe85 100644 --- a/internal/pkg/agent/configuration/settings.go +++ b/internal/pkg/agent/configuration/settings.go @@ -14,6 +14,7 @@ import ( // SettingsConfig is an collection of agent settings configuration. type SettingsConfig struct { + ID string `yaml:"id" config:"id" json:"id"` DownloadConfig *artifact.Config `yaml:"download" config:"download" json:"download"` ProcessConfig *process.Config `yaml:"process" config:"process" json:"process"` GRPC *GRPCConfig `yaml:"grpc" config:"grpc" json:"grpc"` diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index fd99c3bbb82..ef62524455f 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/config" "github.com/elastic/elastic-agent/internal/pkg/config/operations" "github.com/elastic/elastic-agent/pkg/component" + comprt "github.com/elastic/elastic-agent/pkg/component/runtime" "github.com/elastic/elastic-agent/pkg/core/logger" ) @@ -149,7 +150,7 @@ func uninstallComponents(ctx context.Context, cfgFile string) error { // remove each service component for _, comp := range comps { - if err := uninstallComponent(ctx, comp); err != nil { + if err := uninstallComponent(ctx, log, comp); err != nil { os.Stderr.WriteString(fmt.Sprintf("failed to uninstall component %q: %s\n", comp.ID, err)) } } @@ -157,9 +158,8 @@ func uninstallComponents(ctx context.Context, cfgFile string) error { return nil } -func uninstallComponent(_ context.Context, _ component.Component) error { - // TODO(blakerouse): Perform uninstall of service component; once the service runtime is written. - return errors.New("failed to uninstall component; not implemented") +func uninstallComponent(ctx context.Context, log *logp.Logger, comp component.Component) error { + return comprt.UninstallService(ctx, log, comp) } func serviceComponentsFromConfig(specs component.RuntimeSpecs, cfg *config.Config) ([]component.Component, error) { diff --git a/internal/pkg/agent/storage/store/state_store.go b/internal/pkg/agent/storage/store/state_store.go index 522e46fdade..3e90189c55a 100644 --- a/internal/pkg/agent/storage/store/state_store.go +++ b/internal/pkg/agent/storage/store/state_store.go @@ -148,7 +148,6 @@ func migrateStateStore(log *logger.Logger, actionStorePath, stateStorePath strin stateStoreExits, err := stateDiskStore.Exists() if err != nil { - log.With() log.Errorf("failed to check if state store %s exists: %v", stateStorePath, err) return err } diff --git a/pkg/component/component.go b/pkg/component/component.go index f77b27d6fcf..467fede9e40 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -108,6 +108,7 @@ func (r *RuntimeSpecs) ToComponents(policy map[string]interface{}, monitoringInj } func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}) ([]Component, map[string]string, error) { + const revision = "revision" outputsMap, err := toIntermediate(policy) if err != nil { return nil, nil, err @@ -180,6 +181,11 @@ func (r *RuntimeSpecs) PolicyToComponents(policy map[string]interface{}) ([]Comp // skip; not enabled continue } + if v, ok := policy[revision]; ok { + input.input["policy"] = map[string]interface{}{ + revision: v, + } + } cfg, cfgErr := ExpectedConfig(input.input) if cfg != nil { cfg.Type = inputType // ensure alias is replaced in the ExpectedConfig to be non-alias type diff --git a/pkg/component/runtime/README.md b/pkg/component/runtime/README.md new file mode 100644 index 00000000000..0622c911f76 --- /dev/null +++ b/pkg/component/runtime/README.md @@ -0,0 +1,28 @@ +# Runtime documentation + + +## Service runtime + +This part of the documentation describes how the Agent ```service runtime``` works. The design is not new and was inherited from V1, just was not documented anywhere. + +The service runtime is currently used to support integration with the Endpoint service and is very much customized to the expected behavior of the service. The Endpoint service can not be stopped (protected on windows) and the Agent runtime component is not expected to manage the lifetime of the service. The Endpoint service is expected to be always running. + +In order for the Endpoint to connect to the Agent, the Agent starts up the gRPC "connection info" server on the local port specified in the endpoint specification file. The "connection info" service sends the connection parameters/credentials to the agent upon the connection, the Endpoint uses to establish primary connection to the Agent + +The following are the steps the Endpoint goes through to establish the connection to the Agent: +1. The Endpoint connects to the "connection info" local port +2. The Agent sends the connection parameters/credentials to the Endpoint and closes the connection +3. The Endpoint establishes the primary connection to the Agent + +The Agent can only call 3 commands on the endpoint binary that allows it to: +1. Check if the Endpoint service is installed +2. Install the Endpoint service. The Endpoint service is started automatically upon installation. +3. Uninstall the Endpoint service. + + +The Agent is expected to send ```STOPPING``` state to the Endpoint if possible. This helps to ```deactivate``` the Endpoint in the k8s environment for example. + +When the Endpoint is removed from the policy the Endpoint is uninstalled by the Agent as follows: +1. If the Endpoint has never checked in the Agent waits with the timeout for the first check-in +2. The Agent sends ```STOPPING``` state to the Endpoint +3. The Agent calls uninstall command based on the service specification \ No newline at end of file diff --git a/pkg/component/runtime/command.go b/pkg/component/runtime/command.go index 2cabe906b1f..22c1898fcdc 100644 --- a/pkg/component/runtime/command.go +++ b/pkg/component/runtime/command.go @@ -33,6 +33,8 @@ const ( envAgentComponentID = "AGENT_COMPONENT_ID" envAgentComponentInputType = "AGENT_COMPONENT_INPUT_TYPE" + + stateUnknownMessage = "Unknown" ) type MonitoringManager interface { @@ -83,8 +85,8 @@ func NewCommandRuntime(comp component.Component, monitor MonitoringManager) (Com // Run starts the runtime for the component. // -// Called by Manager inside a go-routine. Run should not return until the passed in context is done. Run is always -// called before any of the other methods in the interface and once the context is done none of those methods will +// Called by Manager inside a goroutine. Run does not return until the passed in context is done. Run is always +// called before any of the other methods in the interface and once the context is done none of those methods should // ever be called again. func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error { checkinPeriod := c.current.Spec.Spec.Command.Timeouts.Checkin @@ -243,7 +245,7 @@ func (c *CommandRuntime) forceCompState(state client.UnitState, msg string) { // compState updates just the component state not all the units. func (c *CommandRuntime) compState(state client.UnitState) { - msg := "Unknown" + msg := stateUnknownMessage if state == client.UnitStateHealthy { msg = fmt.Sprintf("Healthy: communicating with pid '%d'", c.proc.PID) } else if state == client.UnitStateDegraded { @@ -298,7 +300,10 @@ func (c *CommandRuntime) start(comm Communicator) error { _ = os.MkdirAll(dataPath, 0755) args = append(args, "-E", "path.data="+dataPath) - proc, err := process.Start(path, uid, gid, args, env, attachOutErr, dirPath(workDir)) + proc, err := process.Start(path, + process.WithArgs(args), + process.WithEnv(env), + process.WithCmdOptions(attachOutErr, dirPath(workDir))) if err != nil { return err } @@ -410,7 +415,7 @@ func attachOutErr(cmd *exec.Cmd) error { return nil } -func dirPath(path string) process.Option { +func dirPath(path string) process.CmdOption { return func(cmd *exec.Cmd) error { cmd.Dir = path return nil diff --git a/pkg/component/runtime/conn_info_server.go b/pkg/component/runtime/conn_info_server.go new file mode 100644 index 00000000000..f3e2a0a6fe0 --- /dev/null +++ b/pkg/component/runtime/conn_info_server.go @@ -0,0 +1,88 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "time" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +const ( + defaultStopTimeout = 15 * time.Second + windows = "windows" +) + +type connInfoServer struct { + log *logger.Logger + listener net.Listener + waitCtx context.Context + stopTimeout time.Duration +} + +func newConnInfoServer(log *logger.Logger, comm Communicator, port int) (*connInfoServer, error) { + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + return nil, fmt.Errorf("failed to start connection credentials listener: %w", err) + } + + s := &connInfoServer{log: log, listener: listener, stopTimeout: defaultStopTimeout} + + var cn context.CancelFunc + s.waitCtx, cn = context.WithCancel(context.Background()) + go func() { + defer cn() + for { + conn, err := listener.Accept() + if err != nil { + log.Errorf("failed accept conn info connection: %v", err) + break + } + log.Debugf("client connected, sending connection info") + err = comm.WriteConnInfo(conn) + if err != nil { + if !errors.Is(err, io.EOF) { + log.Errorf("failed write conn info: %v", err) + } + } + err = conn.Close() + if err != nil { + log.Errorf("failed conn info connection close: %v", err) + } + } + }() + + return s, nil +} + +func (s *connInfoServer) stop() error { + // wait service stop with timeout + ctx, cn := context.WithTimeout(s.waitCtx, s.stopTimeout) + defer cn() + + err := s.listener.Close() + if err != nil { + s.log.Errorf("failed close conn info connection: %v", err) + } + + <-ctx.Done() + cerr := ctx.Err() + if errors.Is(cerr, context.Canceled) { + cerr = nil + } + + if errors.Is(cerr, context.DeadlineExceeded) { + s.log.Errorf("timeout while stopping conn info server: %v", err) + } + if err != nil { + return err + } + return cerr +} diff --git a/pkg/component/runtime/conn_info_server_test.go b/pkg/component/runtime/conn_info_server_test.go new file mode 100644 index 00000000000..4b221a64930 --- /dev/null +++ b/pkg/component/runtime/conn_info_server_test.go @@ -0,0 +1,260 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "runtime" + "syscall" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + protobuf "google.golang.org/protobuf/proto" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/testutils" +) + +type mockCommunicator struct { + ch chan *proto.CheckinObserved + connInfo *proto.ConnInfo +} + +func newMockCommunicator() *mockCommunicator { + return &mockCommunicator{ + ch: make(chan *proto.CheckinObserved, 1), + connInfo: &proto.ConnInfo{ + Addr: getAddress(), + ServerName: "endpoint", + Token: "some token", + CaCert: []byte("some CA cert"), + PeerCert: []byte("some cert"), + PeerKey: []byte("some key"), + Services: []proto.ConnInfoServices{proto.ConnInfoServices_CheckinV2}, + }, + } +} + +func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service) error { + infoBytes, err := protobuf.Marshal(c.connInfo) + if err != nil { + return fmt.Errorf("failed to marshal connection information: %w", err) + } + _, err = w.Write(infoBytes) + if err != nil { + return fmt.Errorf("failed to write connection information: %w", err) + } + + return nil +} + +func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected) { +} + +func (c *mockCommunicator) CheckinObserved() <-chan *proto.CheckinObserved { + return c.ch +} + +const testPort = 6788 + +func getAddress() string { + return fmt.Sprintf("127.0.0.1:%d", testPort) +} + +func TestConnInfoNormal(t *testing.T) { + log := testutils.NewErrorLogger(t) + + comm := newMockCommunicator() + + // Start server + srv, err := newConnInfoServer(log, comm, testPort) + if err != nil { + t.Fatal(err) + } + defer func() { + err := srv.stop() + if err != nil { + t.Fatal(err) + } + }() + + const count = 2 // read connection info a couple of times to make sure the server keeps working for multiple calls + + for i := 0; i < count; i++ { + // Connect to the server + conn, err := net.Dial("tcp", getAddress()) + if err != nil { + t.Fatal(err) + } + + b, err := io.ReadAll(conn) + if err != nil { + t.Fatal(err) + } + + var connInfo proto.ConnInfo + err = protobuf.Unmarshal(b, &connInfo) + if err != nil { + t.Fatal(err) + } + + // Check the received result + diff := cmp.Diff(&connInfo, comm.connInfo, cmpopts.IgnoreUnexported(proto.ConnInfo{})) + if diff != "" { + t.Error(diff) + } + } +} + +func TestConnInfoConnCloseThenAnotherConn(t *testing.T) { + log := testutils.NewErrorLogger(t) + + comm := newMockCommunicator() + + // Start server + srv, err := newConnInfoServer(log, comm, testPort) + if err != nil { + t.Fatal(err) + } + defer func() { + err := srv.stop() + if err != nil { + t.Fatal(err) + } + }() + + // Connect to the server + conn, err := net.Dial("tcp", getAddress()) + if err != nil { + t.Fatal(err) + } + + // Close connection + err = conn.Close() + if err != nil { + t.Fatal(err) + } + + // Connect again after closed + conn, err = net.Dial("tcp", getAddress()) + if err != nil { + t.Fatal(err) + } + + b, err := io.ReadAll(conn) + if err != nil { + t.Fatal(err) + } + + var connInfo proto.ConnInfo + err = protobuf.Unmarshal(b, &connInfo) + if err != nil { + t.Fatal(err) + } + + // Check the received result + diff := cmp.Diff(&connInfo, comm.connInfo, cmpopts.IgnoreUnexported(proto.ConnInfo{})) + if diff != "" { + t.Error(diff) + } +} + +func TestConnInfoClosed(t *testing.T) { + log := testutils.NewErrorLogger(t) + + comm := newMockCommunicator() + + // Start server + srv, err := newConnInfoServer(log, comm, testPort) + if err != nil { + t.Fatal(err) + } + + err = srv.stop() + if err != nil { + t.Fatal(err) + } + + _, err = net.Dial("tcp", getAddress()) + if err == nil { + t.Fatal("want non-nil err") + } + + // There is no good way to check on connection refused error cross-platform + // On windows we get windows.WSAECONNREFUSED on *nix we get syscall.ECONNREFUSED + // Importing the golang.org/x/sys/windows in here in order to get access to windows.WSAECONNREFUSED + // causes issue for *nix builds: "imports golang.org/x/sys/windows: build constraints exclude all Go files". + // In order to avoid creating extra plaform specific files compare just errno for this test. + wantErrNo := int(syscall.ECONNREFUSED) + if runtime.GOOS == windows { + wantErrNo = 10061 // windows.WSAECONNREFUSED + } + var ( + syserr syscall.Errno + errno int + ) + if errors.As(err, &syserr) { + errno = int(syserr) + if wantErrNo != errno { + t.Fatal(err) + } + } else { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestConnInfoDoubleStop(t *testing.T) { + log := testutils.NewErrorLogger(t) + + comm := newMockCommunicator() + + // Start server + srv, err := newConnInfoServer(log, comm, testPort) + if err != nil { + t.Fatal(err) + } + + err = srv.stop() + if err != nil { + t.Fatal(err) + } + + err = srv.stop() + if err == nil { + t.Fatal("want err, got nil ") + } +} + +func TestConnInfoStopTimeout(t *testing.T) { + log := testutils.NewErrorLogger(t) + + comm := newMockCommunicator() + + // Start server + srv, err := newConnInfoServer(log, comm, testPort) + if err != nil { + t.Fatal(err) + } + + // inject the context for wait that we can control to emulate timeout + var cn context.CancelFunc + srv.waitCtx, cn = context.WithCancel(context.Background()) + defer cn() + + srv.stopTimeout = 100 * time.Millisecond + + err = srv.stop() + // Expected timeout on stop + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatal(err) + } +} diff --git a/pkg/component/runtime/runtime.go b/pkg/component/runtime/runtime.go index 58c7d1ed153..e06702b2141 100644 --- a/pkg/component/runtime/runtime.go +++ b/pkg/component/runtime/runtime.go @@ -21,8 +21,8 @@ import ( type ComponentRuntime interface { // Run starts the runtime for the component. // - // Called by Manager inside a go-routine. Run should not return until the passed in context is done. Run is always - // called before any of the other methods in the interface and once the context is done none of those methods will + // Called by Manager inside a goroutine. Run does not return until the passed in context is done. Run is always + // called before any of the other methods in the interface and once the context is done none of those methods should // ever be called again. Run(ctx context.Context, comm Communicator) error // Watch returns the channel that sends component state. @@ -54,13 +54,13 @@ type ComponentRuntime interface { } // NewComponentRuntime creates the proper runtime based on the input specification for the component. -func NewComponentRuntime(comp component.Component, monitor MonitoringManager) (ComponentRuntime, error) { +func NewComponentRuntime(comp component.Component, logger *logger.Logger, monitor MonitoringManager) (ComponentRuntime, error) { if comp.Err != nil { return NewFailedRuntime(comp) } else if comp.Spec.Spec.Command != nil { return NewCommandRuntime(comp, monitor) } else if comp.Spec.Spec.Service != nil { - return nil, errors.New("service component runtime not implemented") + return NewServiceRuntime(comp, logger) } return nil, errors.New("unknown component runtime") } @@ -87,7 +87,7 @@ func newComponentRuntimeState(m *Manager, logger *logger.Logger, monitor Monitor if err != nil { return nil, err } - runtime, err := NewComponentRuntime(comp, monitor) + runtime, err := NewComponentRuntime(comp, logger, monitor) if err != nil { return nil, err } diff --git a/pkg/component/runtime/service.go b/pkg/component/runtime/service.go new file mode 100644 index 00000000000..9c55b9fa4d7 --- /dev/null +++ b/pkg/component/runtime/service.go @@ -0,0 +1,438 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/kardianos/service" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +const ( + defaultCheckServiceStatusInterval = 30 * time.Second // 30 seconds default for now, consistent with the command check-in interval +) + +var ( + ErrOperationSpecUndefined = errors.New("operation spec undefined") + ErrInvalidServiceSpec = errors.New("invalid service spec") +) + +type executeServiceCommandFunc func(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error + +// ServiceRuntime provides the command runtime for running a component as a service. +type ServiceRuntime struct { + comp component.Component + log *logger.Logger + + ch chan ComponentState + actionCh chan actionMode + compCh chan component.Component + statusCh chan service.Status + + state ComponentState + + executeServiceCommandImpl executeServiceCommandFunc +} + +// NewServiceRuntime creates a new command runtime for the provided component. +func NewServiceRuntime(comp component.Component, logger *logger.Logger) (ComponentRuntime, error) { + if comp.Spec.Spec.Service == nil { + return nil, errors.New("must have service defined in specification") + } + + state := newComponentState(&comp) + + s := &ServiceRuntime{ + comp: comp, + log: logger.Named("service_runtime"), + ch: make(chan ComponentState), + actionCh: make(chan actionMode), + compCh: make(chan component.Component), + statusCh: make(chan service.Status), + state: state, + executeServiceCommandImpl: executeServiceCommand, + } + + // Set initial state as STOPPED + s.state.compState(client.UnitStateStopped, fmt.Sprintf("Stopped: %s service", s.name())) + return s, nil +} + +// Run starts the runtime for the component. +// +// Called by Manager inside a goroutine. Run does not return until the passed in context is done. Run is always +// called before any of the other methods in the interface and once the context is done none of those methods should +// ever be called again. +func (s *ServiceRuntime) Run(ctx context.Context, comm Communicator) (err error) { + checkinTimer := time.NewTimer(s.checkinPeriod()) + defer checkinTimer.Stop() + + // Stop the check-ins timer initially + checkinTimer.Stop() + + var ( + cis *connInfoServer + lastCheckin time.Time + missedCheckins int + ) + + cisStop := func() { + if cis != nil { + _ = cis.stop() + cis = nil + } + } + defer cisStop() + + for { + var err error + select { + case <-ctx.Done(): + s.log.Debug("context is done. exiting.") + return ctx.Err() + case as := <-s.actionCh: + switch as { + case actionStart: + // Initial state on start + lastCheckin = time.Time{} + missedCheckins = 0 + checkinTimer.Stop() + cisStop() + + // Start connection info + if cis == nil { + cis, err = newConnInfoServer(s.log, comm, s.comp.Spec.Spec.Service.CPort) + if err != nil { + err = fmt.Errorf("failed to start connection info service %s: %w", s.name(), err) + break + } + } + + // Start service + err = s.start(ctx) + if err != nil { + cisStop() + break + } + + // Start check-in timer + checkinTimer.Reset(s.checkinPeriod()) + case actionStop, actionTeardown: + // Stop check-in timer + s.log.Debugf("stop check-in timer for %s service", s.name()) + checkinTimer.Stop() + + // Stop connection info + s.log.Debugf("stop connection info for %s service", s.name()) + cisStop() + + // Stop service + s.stop(ctx, comm, lastCheckin, as == actionTeardown) + } + if err != nil { + s.forceCompState(client.UnitStateFailed, err.Error()) + } + case newComp := <-s.compCh: + s.processNewComp(newComp, comm) + case checkin := <-comm.CheckinObserved(): + s.processCheckin(checkin, comm, &lastCheckin) + case <-checkinTimer.C: + s.checkStatus(s.checkinPeriod(), &lastCheckin, &missedCheckins) + checkinTimer.Reset(s.checkinPeriod()) + } + } +} + +func (s *ServiceRuntime) start(ctx context.Context) (err error) { + name := s.name() + + // Set state to starting + s.forceCompState(client.UnitStateStarting, fmt.Sprintf("Starting: %s service runtime", name)) + + // Call the check command of the service + s.log.Debugf("check if %s service is installed", name) + err = s.check(ctx) + s.log.Debugf("after check if %s service is installed, err: %v", name, err) + if err != nil { + // Check failed, call the install command of the service + s.log.Debugf("failed check %s service: %v, try install", name, err) + err = s.install(ctx) + if err != nil { + return fmt.Errorf("failed install %s service: %w", name, err) + } + } + + // The service should start on it's own, expecting check-ins + return nil +} + +func (s *ServiceRuntime) stop(ctx context.Context, comm Communicator, lastCheckin time.Time, teardown bool) { + name := s.name() + + s.log.Debugf("stopping %s service runtime", name) + + checkedIn := !lastCheckin.IsZero() + + if teardown { + // If checked in before, send STOPPING + if s.isRunning() { + // If never checked in await for the checkin with the timeout + if !checkedIn { + timeout := s.checkinPeriod() + s.log.Debugf("%s service had never checked in, await for check-in for %v", name, timeout) + checkedIn = s.awaitCheckin(ctx, comm, timeout) + } + + // Received check in send STOPPING + if checkedIn { + s.log.Debugf("send stopping state to %s service", name) + s.state.forceExpectedState(client.UnitStateStopping) + comm.CheckinExpected(s.state.toCheckinExpected()) + } else { + s.log.Debugf("%s service had never checked in, proceed to uninstall", name) + } + } + + s.log.Debug("uninstall %s service", name) + err := s.uninstall(ctx) + if err != nil { + s.log.Errorf("failed %s service uninstall, err: %v", name, err) + } + } + + // Force component stopped state + s.log.Debug("set %s service runtime to stopped state", name) + s.forceCompState(client.UnitStateStopped, fmt.Sprintf("Stopped: %s service runtime", name)) +} + +// awaitCheckin awaits checkin with timeout. +func (s *ServiceRuntime) awaitCheckin(ctx context.Context, comm Communicator, timeout time.Duration) bool { + name := s.name() + t := time.NewTimer(timeout) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + // stop cancelled + s.log.Debugf("stopping %s service, cancelled", name) + return false + case <-t.C: + // stop timed out + s.log.Debugf("stopping %s service, timed out", name) + return false + case <-comm.CheckinObserved(): + return true + } + } +} + +func (s *ServiceRuntime) processNewComp(newComp component.Component, comm Communicator) { + s.log.Debugf("observed component update for %s service", s.name()) + sendExpected := s.state.syncExpected(&newComp) + changed := s.state.syncUnits(&newComp) + if sendExpected || s.state.unsettled() { + comm.CheckinExpected(s.state.toCheckinExpected()) + } + if changed { + s.sendObserved() + } +} + +func (s *ServiceRuntime) processCheckin(checkin *proto.CheckinObserved, comm Communicator, lastCheckin *time.Time) { + name := s.name() + + s.log.Debugf("observed check-in for %s service: %v", name, checkin) + sendExpected := false + changed := false + + if s.state.State == client.UnitStateStarting { + // first observation after start, set component to healthy + s.state.State = client.UnitStateHealthy + s.state.Message = fmt.Sprintf("Healthy: communicating with %s service", name) + changed = true + } + + if !s.isRunning() { + return + } + + if lastCheckin.IsZero() { + // first check-in + sendExpected = true + } + *lastCheckin = time.Now().UTC() + if s.state.syncCheckin(checkin) { + changed = true + } + if s.state.unsettled() { + sendExpected = true + } + if sendExpected { + comm.CheckinExpected(s.state.toCheckinExpected()) + } + if changed { + s.sendObserved() + } + if s.state.cleanupStopped() { + s.sendObserved() + } +} + +// isRunning returns true is the service is running +func (s *ServiceRuntime) isRunning() bool { + return s.state.State != client.UnitStateStopping && + s.state.State != client.UnitStateStopped +} + +// checkStatus checks check-ins state, called on timer +func (s *ServiceRuntime) checkStatus(checkinPeriod time.Duration, lastCheckin *time.Time, missedCheckins *int) { + if s.isRunning() { + now := time.Now().UTC() + if lastCheckin.IsZero() { + // never checked-in + *missedCheckins++ + } else if now.Sub(*lastCheckin) > checkinPeriod { + // missed check-in during required period + *missedCheckins++ + } else if now.Sub(*lastCheckin) <= checkinPeriod { + *missedCheckins = 0 + } + if *missedCheckins == 0 { + s.compState(client.UnitStateHealthy, *missedCheckins) + } else if *missedCheckins > 0 && *missedCheckins < maxCheckinMisses { + s.compState(client.UnitStateDegraded, *missedCheckins) + } else if *missedCheckins >= maxCheckinMisses { + // something is wrong; the service should be checking in + msg := fmt.Sprintf("Failed: %s service missed %d check-ins", s.name(), maxCheckinMisses) + s.forceCompState(client.UnitStateFailed, msg) + } + } +} + +func (s *ServiceRuntime) checkinPeriod() time.Duration { + checkinPeriod := s.comp.Spec.Spec.Service.Timeouts.Checkin + if checkinPeriod == 0 { + checkinPeriod = defaultCheckServiceStatusInterval + } + return checkinPeriod +} + +// Watch returns a channel to watch for component state changes. +// +// A new state is sent anytime the state for a unit or the whole component changes. +func (s *ServiceRuntime) Watch() <-chan ComponentState { + return s.ch +} + +// Start starts the service. +// +// Non-blocking and never returns an error. +func (s *ServiceRuntime) Start() error { + s.actionCh <- actionStart + return nil +} + +// Update updates the currComp runtime with a new-revision for the component definition. +// +// Non-blocking and never returns an error. +func (s *ServiceRuntime) Update(comp component.Component) error { + s.compCh <- comp + return nil +} + +// Stop stops the service. +// +// Non-blocking and never returns an error. +func (s *ServiceRuntime) Stop() error { + s.actionCh <- actionStop + return nil +} + +// Teardown stop and uninstall the service. +// +// Non-blocking and never returns an error. +func (s *ServiceRuntime) Teardown() error { + s.actionCh <- actionTeardown + return nil +} + +func (s *ServiceRuntime) forceCompState(state client.UnitState, msg string) { + if s.state.forceState(state, msg) { + s.sendObserved() + } +} + +func (s *ServiceRuntime) sendObserved() { + s.ch <- s.state.Copy() +} + +func (s *ServiceRuntime) compState(state client.UnitState, missedCheckins int) { + name := s.name() + msg := stateUnknownMessage + if state == client.UnitStateHealthy { + msg = fmt.Sprintf("Healthy: communicating with %s service", name) + } else if state == client.UnitStateDegraded { + if missedCheckins == 1 { + msg = fmt.Sprintf("Degraded: %s service missed 1 check-in", name) + } else { + msg = fmt.Sprintf("Degraded: %s missed %d check-ins", name, missedCheckins) + } + } + if s.state.compState(state, msg) { + s.sendObserved() + } +} + +func (s *ServiceRuntime) name() string { + return s.comp.Spec.Spec.Name +} + +// check executes the service check command +func (s *ServiceRuntime) check(ctx context.Context) error { + if s.comp.Spec.Spec.Service.Operations.Check == nil { + s.log.Errorf("missing check spec for %s service", s.comp.Spec.BinaryName) + return ErrOperationSpecUndefined + } + s.log.Debugf("check if the %s is installed", s.comp.Spec.BinaryName) + return s.executeServiceCommandImpl(ctx, s.log, s.comp.Spec.BinaryPath, s.comp.Spec.Spec.Service.Operations.Check) +} + +// install executes the service install command +func (s *ServiceRuntime) install(ctx context.Context) error { + if s.comp.Spec.Spec.Service.Operations.Install == nil { + s.log.Errorf("missing install spec for %s service", s.comp.Spec.BinaryName) + return ErrOperationSpecUndefined + } + s.log.Debugf("install %s service", s.comp.Spec.BinaryName) + return s.executeServiceCommandImpl(ctx, s.log, s.comp.Spec.BinaryPath, s.comp.Spec.Spec.Service.Operations.Install) +} + +// uninstall executes the service uninstall command +func (s *ServiceRuntime) uninstall(ctx context.Context) error { + return uninstallService(ctx, s.log, s.comp, s.executeServiceCommandImpl) +} + +// UninstallService uninstalls the service +func UninstallService(ctx context.Context, log *logger.Logger, comp component.Component) error { + return uninstallService(ctx, log, comp, executeServiceCommand) +} + +func uninstallService(ctx context.Context, log *logger.Logger, comp component.Component, executeServiceCommandImpl executeServiceCommandFunc) error { + if comp.Spec.Spec.Service.Operations.Uninstall == nil { + log.Errorf("missing uninstall spec for %s service", comp.Spec.BinaryName) + return ErrOperationSpecUndefined + } + log.Debugf("uninstall %s service", comp.Spec.BinaryName) + return executeServiceCommandImpl(ctx, log, comp.Spec.BinaryPath, comp.Spec.Spec.Service.Operations.Uninstall) +} diff --git a/pkg/component/runtime/service_command.go b/pkg/component/runtime/service_command.go new file mode 100644 index 00000000000..61ccda076be --- /dev/null +++ b/pkg/component/runtime/service_command.go @@ -0,0 +1,114 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "bufio" + "context" + "errors" + "fmt" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/dolmen-go/contextio" + + "github.com/elastic/elastic-agent/pkg/component" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/core/process" +) + +func executeCommand(ctx context.Context, log *logger.Logger, binaryPath string, args []string, env []string, timeout time.Duration) error { + log = log.With("context", "command output") + // Create context with timeout if the timeout is greater than 0 + if timeout > 0 { + var cn context.CancelFunc + ctx, cn = context.WithTimeout(ctx, timeout) + defer cn() + } + + opts := []process.StartOption{ + process.WithContext(ctx), + process.WithArgs(args), + process.WithEnv(env), + } + + // Set the command working directory from binary + // This is needed because the endpoint installer was looking for it's resources in the current working directory + wdir := filepath.Dir(binaryPath) + if wdir != "." { + opts = append(opts, + process.WithCmdOptions(func(c *exec.Cmd) error { + c.Dir = wdir + return nil + })) + } + + proc, err := process.Start(binaryPath, opts...) + if err != nil { + return fmt.Errorf("failed starting the command: %w", err) + } + + // channel for the last error message from the stderr output + errch := make(chan string, 1) + ctxStderr := contextio.NewReader(ctx, proc.Stderr) + if ctxStderr != nil { + go func() { + var errText string + scanner := bufio.NewScanner(ctxStderr) + for scanner.Scan() { + line := scanner.Text() + if len(line) > 0 { + txt := strings.TrimSpace(line) + if len(txt) > 0 { + errText = txt + // Log error output line + log.Error(errText) + } + } + } + errch <- errText + }() + } + + procState := <-proc.Wait() + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + err = ctx.Err() // Process was killed due to timeout + } else if !procState.Success() { + err = &exec.ExitError{ProcessState: procState} + } + + if err != nil { + errmsg := <-errch + errmsg = strings.TrimSpace(errmsg) + if errmsg != "" { + err = fmt.Errorf("%s: %w", errmsg, err) + } + } + + return err +} + +func executeServiceCommand(ctx context.Context, log *logger.Logger, binaryPath string, spec *component.ServiceOperationsCommandSpec) error { + if spec == nil { + log.Warnf("spec is nil, nothing to execute, binaryPath: %s", binaryPath) + return nil + } + return executeCommand(ctx, log, binaryPath, spec.Args, envSpecToEnv(spec.Env), spec.Timeout) +} + +func envSpecToEnv(envSpecs []component.CommandEnvSpec) []string { + if len(envSpecs) == 0 { + return nil + } + + env := make([]string, len(envSpecs)) + + for i, spec := range envSpecs { + env[i] = spec.Name + "=" + spec.Value + } + return env +} diff --git a/pkg/component/runtime/service_command_test.go b/pkg/component/runtime/service_command_test.go new file mode 100644 index 00000000000..07e419f2472 --- /dev/null +++ b/pkg/component/runtime/service_command_test.go @@ -0,0 +1,183 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package runtime + +import ( + "context" + "errors" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "testing" + "text/template" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type progConfig struct { + ErrMessage string + ExitCode int + SleepMS int +} + +const testProgramTemplate = ` +package main + +import ( + "fmt" + "os" + "time" +) + +func main() { + if len("{{.ErrMessage}}") > 0 { + fmt.Fprintf(os.Stderr, "{{.ErrMessage}}") + } + if {{.SleepMS}} != 0 { + time.Sleep(time.Duration({{.SleepMS}})*time.Millisecond) + } + if {{.ExitCode}} != 0 { + os.Exit({{.ExitCode}}) + } +} +` +const testModFile = ` +module prog + +go 1.18 +` + +func renderTestProg(cfg progConfig) string { + t := template.Must(template.New("prog").Parse(testProgramTemplate)) + var b strings.Builder + err := t.Execute(&b, cfg) + if err != nil { + panic(err) + } + return b.String() +} + +func getExeName(name string) string { + if runtime.GOOS == "windows" { + return name + ".exe" + } + return name +} + +func prepareTestProg(ctx context.Context, log *logger.Logger, dir string, cfg progConfig) (string, error) { + const name = "prog" + + progPath := filepath.Join(dir, name+".go") + + prog := renderTestProg(cfg) + err := os.WriteFile(progPath, []byte(prog), 0600) + if err != nil { + return "", err + } + + err = os.WriteFile(filepath.Join(dir, "go.mod"), []byte(testModFile), 0600) + if err != nil { + return "", err + } + + err = executeCommand(ctx, log, "go", []string{"build", "-o", dir, progPath}, nil, 0) + if err != nil { + return "", err + } + + return filepath.Join(dir, getExeName(name)), nil +} + +func TestExecuteCommand(t *testing.T) { + log := logp.NewLogger("test_service") + + tests := []struct { + name string + cfg progConfig + timeout time.Duration + wantErr error + }{ + { + name: "success", + }, + { + name: "fail no error output", + cfg: progConfig{"", 1, 0}, + }, + { + name: "fail with error output", + cfg: progConfig{"something failed", 2, 0}, + }, + { + name: "fail with timeout", + cfg: progConfig{"", 3, 5000}, // executable runs for 5 seconds + timeout: 100 * time.Millisecond, + wantErr: context.DeadlineExceeded, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx, cn := context.WithCancel(context.Background()) + defer cn() + + dir := t.TempDir() + + // Prepare test program with expected param + exePath, err := prepareTestProg(ctx, log, dir, tc.cfg) + if err != nil { + t.Fatal(err) + } + + err = executeCommand(ctx, log, exePath, nil, nil, tc.timeout) + + if tc.wantErr != nil { + diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()) + if diff != "" { + t.Fatal(diff) + } + } else { + // If exit code is not 0, expect error + if tc.cfg.ExitCode == 0 { + if err != nil { + t.Fatal(err) + } + } else { + if err != nil { + var exerr *exec.ExitError + if errors.As(err, &exerr) { + diff := cmp.Diff(tc.cfg.ExitCode, exerr.ExitCode()) + if diff != "" { + t.Fatal(diff) + } + } else { + t.Fatalf("want *exec.ExitError, got %T", err) + } + } else { + t.Fatalf("want error code %v, got nil", tc.cfg.ExitCode) + } + } + } + + // Test that we get the proper error message + // The stderr message is prepended to the err, separated with ':', for example "something failed: exit status 2" + if err != nil && tc.cfg.ErrMessage != "" { + arr := strings.Split(err.Error(), ":") + diff := cmp.Diff(tc.cfg.ErrMessage, arr[0]) + if diff != "" { + t.Fatal(diff) + } + } + }) + } + +} diff --git a/pkg/component/runtime/state.go b/pkg/component/runtime/state.go index 4a39a21d82e..832b7548ba7 100644 --- a/pkg/component/runtime/state.go +++ b/pkg/component/runtime/state.go @@ -5,6 +5,7 @@ package runtime import ( + "errors" "reflect" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -131,7 +132,7 @@ func (s *ComponentState) syncExpected(comp *component.Component) bool { existing.configStateIdx = 1 changed = true } - if existing.err != unit.Err { + if !errors.Is(existing.err, unit.Err) { existing.err = unit.Err if existing.err != nil { existing.state = client.UnitStateFailed @@ -221,10 +222,9 @@ func (s *ComponentState) syncCheckin(checkin *proto.CheckinObserved) bool { if unit.Payload != nil { payload = unit.Payload.AsMap() } - touched[key] = true _, inExpected := s.expectedUnits[key] - existing, _ := s.Units[key] + existing := s.Units[key] existing.unitState = client.UnitState(unit.State) existing.unitMessage = unit.Message existing.unitPayload = payload @@ -400,6 +400,18 @@ func (s *ComponentState) forceState(state client.UnitState, msg string) bool { return changed } +// forceExpectedState force updates the expected state for the entire component, forcing that state on all expected units. +func (s *ComponentState) forceExpectedState(state client.UnitState) { + for k, unit := range s.expectedUnits { + if unit.state != state { + unit.state = state + } + + // unit is a copy and must be set back into the map + s.expectedUnits[k] = unit + } +} + // compState updates just the component state not all the units. func (s *ComponentState) compState(state client.UnitState, msg string) bool { if s.State != state || s.Message != msg { diff --git a/pkg/component/spec.go b/pkg/component/spec.go index 3d8b5cfe504..be20b92208c 100644 --- a/pkg/component/spec.go +++ b/pkg/component/spec.go @@ -81,10 +81,22 @@ func (t *CommandTimeoutSpec) InitDefaults() { t.Stop = 30 * time.Second } +// ServiceTimeoutSpec is the timeout specification for subprocess. +type ServiceTimeoutSpec struct { + Checkin time.Duration `config:"checkin" yaml:"checkin"` +} + +// InitDefaults initialized the defaults for the timeouts. +func (t *ServiceTimeoutSpec) InitDefaults() { + t.Checkin = 30 * time.Second +} + // ServiceSpec is the specification for an input that executes as a service. type ServiceSpec struct { + CPort int `config:"cport" yaml:"cport" validate:"required"` Log *ServiceLogSpec `config:"log,omitempty" yaml:"log,omitempty"` Operations ServiceOperationsSpec `config:"operations" yaml:"operations" validate:"required"` + Timeouts ServiceTimeoutSpec `config:"timeouts" yaml:"timeouts"` } // ServiceLogSpec is the specification for the log path that the service logs to. diff --git a/pkg/component/spec_test.go b/pkg/component/spec_test.go index 33866df5b0f..c51ef4b4517 100644 --- a/pkg/component/spec_test.go +++ b/pkg/component/spec_test.go @@ -130,6 +130,8 @@ inputs: outputs: - shipper service: + name: "co.elastic.endpoint" + cport: 6788 operations: install: args: ["install"] diff --git a/pkg/core/process/process.go b/pkg/core/process/process.go index 428469687b6..553dc4989dd 100644 --- a/pkg/core/process/process.go +++ b/pkg/core/process/process.go @@ -17,51 +17,78 @@ type Info struct { PID int Process *os.Process Stdin io.WriteCloser + Stderr io.ReadCloser } -// Option is an option func to change the underlying command -type Option func(c *exec.Cmd) error +// CmdOption is an option func to change the underlying command +type CmdOption func(c *exec.Cmd) error + +// StartConfig configuration for the process start set by the StartOption functions +type StartConfig struct { + ctx context.Context + uid, gid int + args, env []string + cmdOpts []CmdOption +} + +// StartOption start options function +type StartOption func(cfg *StartConfig) // Start starts a new process -func Start(path string, uid, gid int, args []string, env []string, opts ...Option) (proc *Info, err error) { - return StartContext(nil, path, uid, gid, args, env, opts...) //nolint:staticcheck // calls a different function if no ctx +func Start(path string, opts ...StartOption) (proc *Info, err error) { + // Apply options + c := StartConfig{ + uid: os.Geteuid(), + gid: os.Getegid(), + } + + for _, opt := range opts { + opt(&c) + } + + return startContext(c.ctx, path, c.uid, c.gid, c.args, c.env, c.cmdOpts...) } -// StartContext starts a new process with context. -func StartContext(ctx context.Context, path string, uid, gid int, args []string, env []string, opts ...Option) (*Info, error) { - cmd, err := getCmd(ctx, path, env, uid, gid, args...) - if err != nil { - return nil, fmt.Errorf("failed to create command for %q: %w", path, err) +// WithContext sets an optional context +func WithContext(ctx context.Context) StartOption { + return func(cfg *StartConfig) { + cfg.ctx = ctx } - for _, o := range opts { - if err := o(cmd); err != nil { - return nil, fmt.Errorf("failed to set option command for %q: %w", path, err) - } +} + +// WithArgs sets arguments +func WithArgs(args []string) StartOption { + return func(cfg *StartConfig) { + cfg.args = args } - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("failed to create stdin for %q: %w", path, err) +} + +// WithEnv sets the environment variables +func WithEnv(env []string) StartOption { + return func(cfg *StartConfig) { + cfg.env = env } +} - // start process - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("failed to start %q: %w", path, err) +// WithUID sets UID +func WithUID(uid int) StartOption { + return func(cfg *StartConfig) { + cfg.uid = uid } +} - // Hook to JobObject on windows, noop on other platforms. - // This ties the application processes lifespan to the agent's. - // Fixes the orphaned beats processes left behind situation - // after the agent process gets killed. - if err := JobObject.Assign(cmd.Process); err != nil { - _ = killCmd(cmd.Process) - return nil, fmt.Errorf("failed job assignment %q: %w", path, err) +// WithGID sets GID +func WithGID(gid int) StartOption { + return func(cfg *StartConfig) { + cfg.gid = gid } +} - return &Info{ - PID: cmd.Process.Pid, - Process: cmd.Process, - Stdin: stdin, - }, err +// WithCmdOptions sets the exec.Cmd options +func WithCmdOptions(cmdOpts ...CmdOption) StartOption { + return func(cfg *StartConfig) { + cfg.cmdOpts = cmdOpts + } } // Kill kills the process. @@ -99,3 +126,49 @@ func (i *Info) Wait() <-chan *os.ProcessState { return ch } + +// startContext starts a new process with context. The context is optional and can be nil. +func startContext(ctx context.Context, path string, uid, gid int, args []string, env []string, opts ...CmdOption) (*Info, error) { + cmd, err := getCmd(ctx, path, env, uid, gid, args...) + if err != nil { + return nil, fmt.Errorf("failed to create command for %q: %w", path, err) + } + for _, o := range opts { + if err := o(cmd); err != nil { + return nil, fmt.Errorf("failed to set option command for %q: %w", path, err) + } + } + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdin for %q: %w", path, err) + } + + var stderr io.ReadCloser + if cmd.Stderr == nil { + stderr, err = cmd.StderrPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stderr for %q: %w", path, err) + } + } + + // start process + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start %q: %w", path, err) + } + + // Hook to JobObject on windows, noop on other platforms. + // This ties the application processes lifespan to the agent's. + // Fixes the orphaned beats processes left behind situation + // after the agent process gets killed. + if err := JobObject.Assign(cmd.Process); err != nil { + _ = killCmd(cmd.Process) + return nil, fmt.Errorf("failed job assignment %q: %w", path, err) + } + + return &Info{ + PID: cmd.Process.Pid, + Process: cmd.Process, + Stdin: stdin, + Stderr: stderr, + }, err +} diff --git a/specs/endpoint-security.spec.yml b/specs/endpoint-security.spec.yml index a34c66086de..69827c68e75 100644 --- a/specs/endpoint-security.spec.yml +++ b/specs/endpoint-security.spec.yml @@ -14,6 +14,7 @@ inputs: - condition: ${runtime.arch} == 'arm64' and ${runtime.family} == 'redhat' and ${runtime.major} == '7' message: "No support for RHEL7 on arm64" service: + cport: 6788 log: path: "/opt/Elastic/Endpoint/state/log/endpoint-*.log" operations: &operations @@ -46,6 +47,7 @@ inputs: outputs: - elasticsearch service: + cport: 6788 log: path: "/Library/Elastic/Endpoint/state/log/endpoint-*.log" operations: *operations @@ -60,6 +62,7 @@ inputs: - condition: ${runtime.user.root} == false message: "Elastic Agent must be running as Administrator or SYSTEM" service: + cport: 6788 log: path: "C:\\Program Files\\Elastic\\Endpoint\\state\\log\\endpoint-*.log" - operations: *operations + operations: *operations \ No newline at end of file