diff --git a/NOTICE-fips.txt b/NOTICE-fips.txt index 457b519c7f1..ed26e2e225a 100644 --- a/NOTICE-fips.txt +++ b/NOTICE-fips.txt @@ -5778,6 +5778,217 @@ Contents of probable licence file $GOMODCACHE/github.com/open-telemetry/opentele limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension +Version: v0.127.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension@v0.127.0/LICENSE: + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver Version: v0.127.0 diff --git a/NOTICE.txt b/NOTICE.txt index 9f38c1934e8..950b2bc9e7a 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5989,6 +5989,217 @@ Contents of probable licence file $GOMODCACHE/github.com/open-telemetry/opentele limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension +Version: v0.127.0 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension@v0.127.0/LICENSE: + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver Version: v0.127.0 diff --git a/go.mod b/go.mod index dfb4adf10f8..b566be80abf 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter v0.127.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension v0.127.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.127.0 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension v0.127.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver v0.127.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.127.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status v0.127.0 diff --git a/go.sum b/go.sum index 03750ddecf0..f124184880c 100644 --- a/go.sum +++ b/go.sum @@ -1279,6 +1279,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otl github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/otlpencodingextension v0.127.0/go.mod h1:PuBIKJHNhHfyrBojM0q9OJDoUB2pA7cb2ytVYd3jLvA= github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.127.0 h1:jZ1jm12S1KZ28lI9mqWbqnAnTVVBoBOcDGTmgrTQJ08= github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension v0.127.0/go.mod h1:muh17dM7w489SrmFc6n42wWA+wWvTLV1BlE92xUY150= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension v0.127.0 h1:YMWd06HhyJY6iTlwfENNMEXNIyVtQ6A2mZ9TUt/Wje4= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension v0.127.0/go.mod h1:PaDEiW3h30o0LQlemkTaKbPac6RzxnDthw/ZKQQkbLE= github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.127.0 h1:FVzar7k+uGx7hrLUqwfKhCyrfQFCWLqRUNR/W5zUsoc= github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector v0.127.0/go.mod h1:N1aky/3s9tGQkSVII5BoemeEp6rCsFLjxg7pmDhOSV0= github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer v0.127.0 h1:IxhkClhCt4W/KhyRru1vIzOVmVzRpwRsGh6D+vLgP+0= diff --git a/internal/pkg/agent/application/application.go b/internal/pkg/agent/application/application.go index fb02f9bcc99..f887e37d0dc 100644 --- a/internal/pkg/agent/application/application.go +++ b/internal/pkg/agent/application/application.go @@ -224,7 +224,10 @@ func New( return nil, nil, nil, errors.New(err, "failed to initialize composable controller") } - otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"), baseLogger) + otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err) + } coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, compModifiers...) if managed != nil { // the coordinator requires the config manager as well as in managed-mode the config manager requires the diff --git a/internal/pkg/agent/application/coordinator/coordinator_test.go b/internal/pkg/agent/application/coordinator/coordinator_test.go index 8425298658b..4a8c0d47b85 100644 --- a/internal/pkg/agent/application/coordinator/coordinator_test.go +++ b/internal/pkg/agent/application/coordinator/coordinator_test.go @@ -1078,7 +1078,8 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt cfg.Port = 0 rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg) require.NoError(t, err) - otelMgr := otelmanager.NewOTelManager(l, l) + otelMgr, err := otelmanager.NewOTelManager(l, logp.InfoLevel, l, otelmanager.EmbeddedExecutionMode) + require.NoError(t, err) caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l) require.NoError(t, err) diff --git a/internal/pkg/agent/cmd/otel.go b/internal/pkg/agent/cmd/otel.go index 5f01b5d5ca0..c6b6b7dfda9 100644 --- a/internal/pkg/agent/cmd/otel.go +++ b/internal/pkg/agent/cmd/otel.go @@ -6,14 +6,25 @@ package cmd import ( "context" + "fmt" "os" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/spf13/cobra" "github.com/spf13/pflag" + "go.opentelemetry.io/collector/otelcol" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/service" + "github.com/elastic/elastic-agent/internal/pkg/cli" "github.com/elastic/elastic-agent/internal/pkg/otel" + "github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider" + "github.com/elastic/elastic-agent/internal/pkg/otel/manager" + "github.com/elastic/elastic-agent/internal/pkg/release" + "github.com/elastic/elastic-agent/pkg/core/logger" ) func newOtelCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command { @@ -26,10 +37,18 @@ func newOtelCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Comman if err != nil { return err } + supervised, err := cmd.Flags().GetBool(manager.OtelSetSupervisedFlagName) + if err != nil { + return err + } + supervisedLoggingLevel, err := cmd.Flags().GetString(manager.OtelSupervisedLoggingLevelFlagName) + if err != nil { + return err + } if err := prepareEnv(); err != nil { return err } - return runCollector(cmd.Context(), cfgFiles) + return RunCollector(cmd.Context(), cfgFiles, supervised, supervisedLoggingLevel) }, PreRun: func(c *cobra.Command, args []string) { // hide inherited flags not to bloat help with flags not related to otel @@ -57,7 +76,11 @@ func hideInheritedFlags(c *cobra.Command) { }) } -func runCollector(cmdCtx context.Context, configFiles []string) error { +func RunCollector(cmdCtx context.Context, configFiles []string, supervised bool, supervisedLoggingLevel string) error { + settings, err := prepareCollectorSettings(configFiles, supervised, supervisedLoggingLevel) + if err != nil { + return fmt.Errorf("failed to prepare collector settings: %w", err) + } // Windows: Mark service as stopped. // After this is run, the service is considered by the OS to be stopped. // This must be the first deferred cleanup task (last to execute). @@ -79,7 +102,58 @@ func runCollector(cmdCtx context.Context, configFiles []string) error { defer cancel() go service.ProcessWindowsControlEvents(stopCollector) - return otel.Run(ctx, stop, configFiles) + return otel.Run(ctx, stop, settings) +} + +func prepareCollectorSettings(configFiles []string, supervised bool, supervisedLoggingLevel string) (*otelcol.CollectorSettings, error) { + var settings *otelcol.CollectorSettings + if supervised { + // add stdin config provider + configProvider, err := agentprovider.NewBufferProvider(os.Stdin) + if err != nil { + return nil, fmt.Errorf("failed to create config provider: %w", err) + } + settings = otel.NewSettings(release.Version(), []string{configProvider.URI()}, + otel.WithConfigProviderFactory(configProvider.NewFactory()), + ) + + // setup logger + defaultCfg := logger.DefaultLoggingConfig() + defaultEventLogCfg := logger.DefaultEventLoggingConfig() + + defaultCfg.ToStderr = true + defaultCfg.ToFiles = false + + defaultEventLogCfg.ToFiles = false + defaultEventLogCfg.ToStderr = true + + var logLevelSettingErr error + if supervisedLoggingLevel != "" { + if logLevelSettingErr = defaultCfg.Level.Unpack(supervisedLoggingLevel); logLevelSettingErr != nil { + defaultCfg.Level = logp.InfoLevel + } + } else { + defaultCfg.Level = logp.InfoLevel + } + + l, err := logger.NewFromConfig("edot", defaultCfg, defaultEventLogCfg, false) + if err != nil { + return nil, fmt.Errorf("failed to create logger: %w", err) + } + + if logLevelSettingErr != nil { + l.Warnf("Fallback to default logging level due to: %v", logLevelSettingErr) + } + + settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { + return l.Core() + })} + + settings.DisableGracefulShutdown = false + } else { + settings = otel.NewSettings(release.Version(), configFiles) + } + return settings, nil } func prepareEnv() error { diff --git a/internal/pkg/agent/cmd/otel_flags.go b/internal/pkg/agent/cmd/otel_flags.go index 57cae05884e..e5818eee19a 100644 --- a/internal/pkg/agent/cmd/otel_flags.go +++ b/internal/pkg/agent/cmd/otel_flags.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/featuregate" "github.com/elastic/elastic-agent/internal/pkg/agent/application/paths" + "github.com/elastic/elastic-agent/internal/pkg/otel/manager" ) const ( @@ -28,6 +29,16 @@ func setupOtelFlags(flags *pflag.FlagSet) { flags.StringArray(otelSetFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file and the flag"+ " has a higher precedence. Array config properties are overridden and maps are joined. Example --set=processors.batch.timeout=2s") + flags.Bool(manager.OtelSetSupervisedFlagName, false, "Set that this collector is supervised.") + // the only error we can get here is that the flag does not exist + // but look above, so we explicitly ignore it + _ = flags.MarkHidden(manager.OtelSetSupervisedFlagName) + + flags.String(manager.OtelSupervisedLoggingLevelFlagName, "info", "Set the logging level of the supervised collector.") + // the only error we can get here is that the flag does not exist + // but look above, so we explicitly ignore it + _ = flags.MarkHidden(manager.OtelSupervisedLoggingLevelFlagName) + goFlags := new(flag.FlagSet) featuregate.GlobalRegistry().RegisterFlags(goFlags) diff --git a/internal/pkg/agent/cmd/otel_test.go b/internal/pkg/agent/cmd/otel_test.go new file mode 100644 index 00000000000..073fd46d58b --- /dev/null +++ b/internal/pkg/agent/cmd/otel_test.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package cmd + +import ( + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider" +) + +func TestPrepareCollectorSettings(t *testing.T) { + t.Run("returns valid settings in supervised mode", func(t *testing.T) { + // mock stdin with a basic OTEL config if needed + oldStdin := os.Stdin + defer func() { os.Stdin = oldStdin }() + + r, w, err := os.Pipe() + require.NoError(t, err, "failed to create pipe") + _, err = w.WriteString(`receivers: { otlp: {} }`) + require.NoError(t, err, "failed to write to pipe") + require.NoError(t, w.Close(), "failed to close pipe") + os.Stdin = r + + settings, err := prepareCollectorSettings(nil, true, "info") + require.NoError(t, err, "failed to prepare collector settings") + require.NotNil(t, settings, "settings should not be nil") + require.NotNil(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "URIs should not be nil") + agentProviderURIFound := false + for _, uri := range settings.ConfigProviderSettings.ResolverSettings.URIs { + agentProviderURIFound = strings.Contains(uri, agentprovider.AgentConfigProviderSchemeName) + if agentProviderURIFound { + break + } + } + require.True(t, agentProviderURIFound, "agentprovider Scheme not found in the URIS of ConfigProviderSettings") + require.NotNil(t, settings.LoggingOptions, "loggingOptions should not be nil for supervised mode") + }) + + t.Run("returns valid settings in standalone mode", func(t *testing.T) { + settings, err := prepareCollectorSettings([]string{"fake-config.yaml"}, false, "info") + require.NoError(t, err, "failed to prepare collector settings") + require.NotNil(t, settings, "settings should not be nil") + require.Contains(t, settings.ConfigProviderSettings.ResolverSettings.URIs, "fake-config.yaml", "fake-config.yaml not found in the URIS of ConfigProviderSettings") + }) + + t.Run("fails when supervised mode has invalid config from stdin", func(t *testing.T) { + oldStdin := os.Stdin + defer func() { os.Stdin = oldStdin }() + r, w, err := os.Pipe() + require.NoError(t, err, "failed to create pipe") + _, err = w.WriteString(`receivers { otlp: {} }`) // invalid yaml + require.NoError(t, err, "failed to write to pipe") + require.NoError(t, w.Close(), "failed to close pipe") + os.Stdin = r + + settings, err := prepareCollectorSettings(nil, true, "info") + require.Error(t, err) + require.Nil(t, settings) + }) + + t.Run("doesn't fail when unsupervised mode has invalid config from stdin", func(t *testing.T) { + oldStdin := os.Stdin + defer func() { os.Stdin = oldStdin }() + r, w, err := os.Pipe() + require.NoError(t, err, "failed to create pipe") + _, err = w.WriteString(`receivers { otlp: {} }`) + require.NoError(t, err, "failed to write to pipe") + require.NoError(t, w.Close(), "failed to close pipe") + os.Stdin = r + + settings, err := prepareCollectorSettings(nil, false, "info") + require.NoError(t, err) + require.NotNil(t, settings) + }) +} diff --git a/internal/pkg/otel/README.md b/internal/pkg/otel/README.md index 5f3046c306b..83b369cffce 100644 --- a/internal/pkg/otel/README.md +++ b/internal/pkg/otel/README.md @@ -91,6 +91,7 @@ This section provides a summary of components included in the Elastic Distributi | [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.127.0/extension/bearertokenauthextension/README.md) | v0.127.0 | | [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.127.0/extension/storage/filestorage/README.md) | v0.127.0 | | [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.127.0/extension/healthcheckextension/README.md) | v0.127.0 | +| [healthcheckv2extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckv2extension/v0.127.0/extension/healthcheckv2extension/README.md) | v0.127.0 | | [k8sobserver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/observer/k8sobserver/v0.127.0/extension/observer/k8sobserver/README.md) | v0.127.0 | | [memorylimiterextension](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/memorylimiterextension/v0.127.0/extension/memorylimiterextension/README.md) | v0.127.0 | | [pprofextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/pprofextension/v0.127.0/extension/pprofextension/README.md) | v0.127.0 | diff --git a/internal/pkg/otel/agentprovider/buffer_provider.go b/internal/pkg/otel/agentprovider/buffer_provider.go new file mode 100644 index 00000000000..ac86211d35c --- /dev/null +++ b/internal/pkg/otel/agentprovider/buffer_provider.go @@ -0,0 +1,85 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package agentprovider + +import ( + "context" + "fmt" + "io" + + "github.com/gofrs/uuid/v5" + "go.opentelemetry.io/collector/confmap" +) + +// build time guard that BufferProvider implements confmap.Provider +var _ confmap.Provider = (*BufferProvider)(nil) + +// BufferProvider is a fixed provider that has a factory but only returns the same provider. +type BufferProvider struct { + uri string + cfg *confmap.Conf +} + +// NewBufferProvider creates a `agentprovider.BufferProvider`. +func NewBufferProvider(in io.Reader) (*BufferProvider, error) { + uri := fmt.Sprintf("%s:%s", AgentConfigProviderSchemeName, uuid.Must(uuid.NewV4()).String()) + + if in == nil { + return &BufferProvider{ + uri: uri, + cfg: nil, + }, nil + } + + configBytes, err := io.ReadAll(in) + if err != nil { + return nil, fmt.Errorf("failed to read config from buffer: %w", err) + } + + retrieved, err := confmap.NewRetrievedFromYAML(configBytes) + if err != nil { + return nil, fmt.Errorf("failed to parse config from buffer: %w", err) + } + + conf, err := retrieved.AsConf() + if err != nil { + return nil, fmt.Errorf("failed to convert config to confmap: %w", err) + } + + return &BufferProvider{ + uri: uri, + cfg: conf, + }, nil +} + +// NewFactory provides a factory. This factory doesn't create a new provider on each call. It always returns the same provider. +func (p *BufferProvider) NewFactory() confmap.ProviderFactory { + return confmap.NewProviderFactory(func(_ confmap.ProviderSettings) confmap.Provider { + return p + }) +} + +// Retrieve returns the latest configuration. +func (p *BufferProvider) Retrieve(_ context.Context, uri string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) { + if uri != p.uri { + return nil, fmt.Errorf("%q uri doesn't equal defined %q provider", uri, AgentConfigProviderSchemeName) + } + return confmap.NewRetrieved(p.cfg.ToStringMap()) +} + +// Scheme is the scheme for this provider. +func (p *BufferProvider) Scheme() string { + return AgentConfigProviderSchemeName +} + +// Shutdown called by collect when stopping. +func (p *BufferProvider) Shutdown(context.Context) error { + return nil +} + +// URI returns the URI to be used for this provider. +func (p *BufferProvider) URI() string { + return p.uri +} diff --git a/internal/pkg/otel/agentprovider/buffer_provider_test.go b/internal/pkg/otel/agentprovider/buffer_provider_test.go new file mode 100644 index 00000000000..556fda66d5a --- /dev/null +++ b/internal/pkg/otel/agentprovider/buffer_provider_test.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package agentprovider + +import ( + "bytes" + "context" + "testing" + + "gopkg.in/yaml.v3" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" +) + +func TestBufferProvider_NewFactory(t *testing.T) { + p, err := NewBufferProvider(nil) + require.NoError(t, err) + assert.Equal(t, p, p.NewFactory().Create(confmap.ProviderSettings{})) +} + +func TestBufferProvider_Schema(t *testing.T) { + p, err := NewBufferProvider(nil) + require.NoError(t, err) + assert.Equal(t, AgentConfigProviderSchemeName, p.Scheme()) +} + +func TestBufferProvider_URI(t *testing.T) { + p, err := NewBufferProvider(nil) + require.NoError(t, err) + assert.Equal(t, p.uri, p.URI()) +} + +func TestBufferProvider_Retrieve(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := confmap.New() + confMap := cfg.ToStringMap() + confBytes, err := yaml.Marshal(confMap) + require.NoError(t, err) + + p, err := NewBufferProvider(bytes.NewReader(confBytes)) + require.NoError(t, err) + ret, err := p.Retrieve(ctx, p.URI(), func(event *confmap.ChangeEvent) {}) + require.NoError(t, err) + retCfg, err := ret.AsConf() + require.NoError(t, err) + require.Equal(t, cfg, retCfg) +} + +func TestBufferProvider_Shutdown(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := confmap.New() + confMap := cfg.ToStringMap() + confBytes, err := yaml.Marshal(confMap) + require.NoError(t, err) + + p, err := NewBufferProvider(bytes.NewReader(confBytes)) + require.NoError(t, err) + ret, err := p.Retrieve(ctx, p.URI(), func(event *confmap.ChangeEvent) {}) + require.NoError(t, err) + retCfg, err := ret.AsConf() + require.NoError(t, err) + require.Equal(t, cfg, retCfg) + + err = p.Shutdown(ctx) + require.NoError(t, err) +} diff --git a/internal/pkg/otel/agentprovider/provider.go b/internal/pkg/otel/agentprovider/provider.go index 02cf3e3b399..199369b9db2 100644 --- a/internal/pkg/otel/agentprovider/provider.go +++ b/internal/pkg/otel/agentprovider/provider.go @@ -13,7 +13,8 @@ import ( "go.opentelemetry.io/collector/confmap" ) -const schemeName = "elasticagent" +// build time guard that BufferProvider implements confmap.Provider +var _ confmap.Provider = (*Provider)(nil) // Provider is a fixed provider that has a factory but only returns the same provider. type Provider struct { @@ -29,7 +30,7 @@ type Provider struct { // NewProvider creates a `agentprovider.Provider`. func NewProvider(cfg *confmap.Conf) *Provider { - uri := fmt.Sprintf("%s:%s", schemeName, uuid.Must(uuid.NewV4()).String()) + uri := fmt.Sprintf("%s:%s", AgentConfigProviderSchemeName, uuid.Must(uuid.NewV4()).String()) return &Provider{ uri: uri, cfg: cfg, @@ -61,7 +62,7 @@ func (p *Provider) Update(cfg *confmap.Conf) { // Retrieve returns the latest configuration. func (p *Provider) Retrieve(ctx context.Context, uri string, watcher confmap.WatcherFunc) (*confmap.Retrieved, error) { if uri != p.uri { - return nil, fmt.Errorf("%q uri doesn't equal defined %q provider", uri, schemeName) + return nil, fmt.Errorf("%q uri doesn't equal defined %q provider", uri, AgentConfigProviderSchemeName) } // get latest cfg at time of call @@ -87,7 +88,7 @@ func (p *Provider) Retrieve(ctx context.Context, uri string, watcher confmap.Wat // Scheme is the scheme for this provider. func (p *Provider) Scheme() string { - return schemeName + return AgentConfigProviderSchemeName } // Shutdown called by collect when stopping. diff --git a/internal/pkg/otel/agentprovider/provider_test.go b/internal/pkg/otel/agentprovider/provider_test.go index 5b6cea50439..d381ffcd84f 100644 --- a/internal/pkg/otel/agentprovider/provider_test.go +++ b/internal/pkg/otel/agentprovider/provider_test.go @@ -20,7 +20,7 @@ func TestProvider_NewFactory(t *testing.T) { func TestProvider_Schema(t *testing.T) { p := NewProvider(nil) - assert.Equal(t, schemeName, p.Scheme()) + assert.Equal(t, AgentConfigProviderSchemeName, p.Scheme()) } func TestProvider_URI(t *testing.T) { diff --git a/internal/pkg/otel/agentprovider/scheme.go b/internal/pkg/otel/agentprovider/scheme.go new file mode 100644 index 00000000000..538aa825034 --- /dev/null +++ b/internal/pkg/otel/agentprovider/scheme.go @@ -0,0 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package agentprovider + +const AgentConfigProviderSchemeName = "elasticagent" diff --git a/internal/pkg/otel/components.go b/internal/pkg/otel/components.go index 02d8de20b6c..56fc58fe158 100644 --- a/internal/pkg/otel/components.go +++ b/internal/pkg/otel/components.go @@ -59,6 +59,7 @@ import ( // Extensions "github.com/open-telemetry/opentelemetry-collector-contrib/extension/bearertokenauthextension" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension" + healthcheckv2extension "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckv2extension" k8sobserver "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver" pprofextension "github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension" filestorage "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage" @@ -153,6 +154,7 @@ func components(extensionFactories ...extension.Factory) func() (otelcol.Factori } extensions := []extension.Factory{ + healthcheckv2extension.NewFactory(), memorylimiterextension.NewFactory(), filestorage.NewFactory(), healthcheckextension.NewFactory(), diff --git a/internal/pkg/otel/manager/common.go b/internal/pkg/otel/manager/common.go new file mode 100644 index 00000000000..218bbebd95d --- /dev/null +++ b/internal/pkg/otel/manager/common.go @@ -0,0 +1,62 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + "fmt" + "net" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" +) + +// for testing purposes +var netListen = net.Listen + +// reportErr sends an error to the provided error channel. It first drains the channel +// to ensure that only the most recent error is kept, as intermediate errors can be safely discarded. +// This ensures the receiver always observes the latest reported error. +func reportErr(ctx context.Context, errCh chan error, err error) { + select { + case <-ctx.Done(): + // context is already done + return + case <-errCh: + // drain the error channel first + default: + } + select { + case errCh <- err: + case <-ctx.Done(): + } +} + +// reportStatus sends the new status to the status channel. +func reportStatus(ctx context.Context, statusCh chan *status.AggregateStatus, statuses *status.AggregateStatus) { + select { + case <-ctx.Done(): + return + case statusCh <- statuses: + } +} + +// findRandomTCPPort finds a random available TCP port on the localhost interface. +func findRandomTCPPort() (int, error) { + l, err := netListen("tcp", "localhost:0") + if err != nil { + return 0, err + } + + port := l.Addr().(*net.TCPAddr).Port + err = l.Close() + if err != nil { + return 0, err + } + if port == 0 { + return 0, fmt.Errorf("failed to find random port") + } + + return port, nil +} diff --git a/internal/pkg/otel/manager/common_test.go b/internal/pkg/otel/manager/common_test.go new file mode 100644 index 00000000000..d616bbc19e4 --- /dev/null +++ b/internal/pkg/otel/manager/common_test.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "errors" + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestFindRandomPort(t *testing.T) { + port, err := findRandomTCPPort() + require.NoError(t, err) + require.NotEqual(t, 0, port) + + defer func() { + netListen = net.Listen + }() + + netListen = func(string, string) (net.Listener, error) { + return nil, errors.New("some error") + } + _, err = findRandomTCPPort() + require.Error(t, err, "failed to find random port") +} diff --git a/internal/pkg/otel/manager/execution.go b/internal/pkg/otel/manager/execution.go new file mode 100644 index 00000000000..d55ff6c09f4 --- /dev/null +++ b/internal/pkg/otel/manager/execution.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/confmap" + + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +type collectorExecution interface { + startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) +} + +type collectorHandle interface { + Stop(ctx context.Context) +} diff --git a/internal/pkg/otel/manager/execution_embedded.go b/internal/pkg/otel/manager/execution_embedded.go new file mode 100644 index 00000000000..696dde81428 --- /dev/null +++ b/internal/pkg/otel/manager/execution_embedded.go @@ -0,0 +1,80 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/otelcol" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent/internal/pkg/otel" + "github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider" + "github.com/elastic/elastic-agent/internal/pkg/release" + "github.com/elastic/elastic-agent/pkg/core/logger" +) + +func newExecutionEmbedded() *embeddedExecution { + return &embeddedExecution{} +} + +type embeddedExecution struct { +} + +// startCollector starts the collector in a new goroutine. +func (r *embeddedExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { + collectorCtx, collectorCancel := context.WithCancel(ctx) + ap := agentprovider.NewProvider(cfg) + + ctl := &ctxHandle{ + collectorDoneCh: make(chan struct{}), + cancel: collectorCancel, + } + + // NewForceExtensionConverterFactory is used to ensure that the agent_status extension is always enabled. + // It is required for the Elastic Agent to extract the status out of the OTel collector. + settings := otel.NewSettings( + release.Version(), []string{ap.URI()}, + otel.WithConfigProviderFactory(ap.NewFactory()), + otel.WithConfigConvertorFactory(NewForceExtensionConverterFactory(AgentStatusExtensionType.String())), + otel.WithExtensionFactory(NewAgentStatusFactory(statusCh))) + settings.DisableGracefulShutdown = true // managed by this manager + settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { + return logger.Core() // use same zap as agent + })} + svc, err := otelcol.NewCollector(*settings) + if err != nil { + collectorCancel() + return nil, err + } + go func() { + runErr := svc.Run(collectorCtx) + close(ctl.collectorDoneCh) + reportErr(ctx, errCh, runErr) + }() + return ctl, nil +} + +type ctxHandle struct { + collectorDoneCh chan struct{} + cancel context.CancelFunc +} + +// Stop stops the collector +func (s *ctxHandle) Stop(ctx context.Context) { + if s.cancel == nil { + return + } + + s.cancel() + + select { + case <-ctx.Done(): + case <-s.collectorDoneCh: + } +} diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go new file mode 100644 index 00000000000..c94620e2991 --- /dev/null +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -0,0 +1,229 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "time" + + "gopkg.in/yaml.v3" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/confmap" + "go.uber.org/zap/zapcore" + + "github.com/elastic/elastic-agent-libs/logp" + + runtimeLogger "github.com/elastic/elastic-agent/pkg/component/runtime" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent/pkg/core/process" +) + +const ( + processKillAfter = 5 * time.Second + + OtelSetSupervisedFlagName = "supervised" + OtelSupervisedLoggingLevelFlagName = "supervised.logging.level" +) + +func newSubprocessExecution(logLevel logp.Level, collectorPath string) *subprocessExecution { + return &subprocessExecution{ + collectorPath: collectorPath, + collectorArgs: []string{ + "otel", + fmt.Sprintf("--%s", OtelSetSupervisedFlagName), + fmt.Sprintf("--%s=%s", OtelSupervisedLoggingLevelFlagName, logLevel.String()), + }, + logLevel: logLevel, + } +} + +type subprocessExecution struct { + collectorPath string + collectorArgs []string + logLevel logp.Level +} + +// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the +// processErrCh channel. Other run errors, such as not able to connect to the health endpoint, are sent to the runErrCh channel. +func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, processErrCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { + if cfg == nil { + // configuration is required + return nil, errors.New("no configuration provided") + } + + if r.collectorPath == "" { + // collector path is required + return nil, errors.New("no collector path provided") + } + + if _, err := os.Stat(r.collectorPath); err != nil { + // we cannot access the collector path + return nil, fmt.Errorf("cannot access collector path: %w", err) + } + + httpHealthCheckPort, err := findRandomTCPPort() + if err != nil { + return nil, fmt.Errorf("could not find port for http health check: %w", err) + } + + if err := injectHeathCheckV2Extension(cfg, httpHealthCheckPort); err != nil { + return nil, fmt.Errorf("failed to inject health check extension: %w", err) + } + + confMap := cfg.ToStringMap() + confBytes, err := yaml.Marshal(confMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal config to yaml: %w", err) + } + + stdOut := runtimeLogger.NewLogWriterWithDefaults(logger.Core(), zapcore.Level(r.logLevel)) + // info level for stdErr because by default collector writes to stderr + stdErr := runtimeLogger.NewLogWriterWithDefaults(logger.Core(), zapcore.Level(r.logLevel)) + + procCtx, procCtxCancel := context.WithCancel(ctx) + processInfo, err := process.Start(r.collectorPath, + process.WithArgs(r.collectorArgs), + process.WithContext(procCtx), + process.WithEnv(os.Environ()), + process.WithCmdOptions(func(c *exec.Cmd) error { + c.Stdin = bytes.NewReader(confBytes) + c.Stdout = stdOut + c.Stderr = stdErr + return nil + }), + ) + if err != nil { + // we failed to start the process + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: %w", err) + } + logger.Infof("supervised collector started with pid: %d and healthcheck port: %d", processInfo.Process.Pid, httpHealthCheckPort) + if processInfo.Process == nil { + // this should not happen but just in case + procCtxCancel() + return nil, fmt.Errorf("failed to start supervised collector: process is nil") + } + + ctl := &procHandle{ + processDoneCh: make(chan struct{}), + processInfo: processInfo, + } + + healthCheckDone := make(chan struct{}) + go func() { + defer func() { + close(healthCheckDone) + }() + currentStatus := aggregateStatus(componentstatus.StatusStarting, nil) + reportStatus(ctx, statusCh, currentStatus) + + // specify a max duration of not being able to get the status from the collector + const maxFailuresDuration = 130 * time.Second + maxFailuresTimer := time.NewTimer(maxFailuresDuration) + defer maxFailuresTimer.Stop() + + // check the health of the collector every 1 second + const healthCheckPollDuration = 1 * time.Second + healthCheckPollTimer := time.NewTimer(healthCheckPollDuration) + defer healthCheckPollTimer.Stop() + for { + statuses, err := AllComponentsStatuses(procCtx, httpHealthCheckPort) + if err != nil { + switch { + case errors.Is(err, context.Canceled): + reportStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + return + } + } else { + maxFailuresTimer.Reset(maxFailuresDuration) + + if !compareStatuses(currentStatus, statuses) { + currentStatus = statuses + reportStatus(procCtx, statusCh, statuses) + } + } + + select { + case <-procCtx.Done(): + reportStatus(ctx, statusCh, aggregateStatus(componentstatus.StatusStopped, nil)) + return + case <-healthCheckPollTimer.C: + healthCheckPollTimer.Reset(healthCheckPollDuration) + case <-maxFailuresTimer.C: + failedToConnectStatuses := aggregateStatus( + componentstatus.StatusRecoverableError, + errors.New("failed to connect to collector"), + ) + if !compareStatuses(currentStatus, failedToConnectStatuses) { + currentStatus = statuses + reportStatus(procCtx, statusCh, statuses) + } + } + } + }() + + go func() { + procState, procErr := processInfo.Process.Wait() + procCtxCancel() + <-healthCheckDone + close(ctl.processDoneCh) + // using ctx instead of procCtx in the reportErr functions below is intentional. This allows us to report + // errors to the caller through processErrCh and essentially discard any other errors that occurred because + // the process exited. + if procErr == nil { + if procState.Success() { + // report nil error so that the caller can be notified that the process has exited without error + reportErr(ctx, processErrCh, nil) + } else { + reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())) + } + return + } + + reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr)) + }() + + return ctl, nil +} + +type procHandle struct { + processDoneCh chan struct{} + processInfo *process.Info +} + +// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within +// processKillAfter or due to an error, it will be killed. +func (s *procHandle) Stop(ctx context.Context) { + select { + case <-s.processDoneCh: + // process has already exited + return + default: + } + + if err := s.processInfo.Stop(); err != nil { + // we failed to stop the process just kill it and return + _ = s.processInfo.Kill() + return + } + + select { + case <-ctx.Done(): + // our caller ctx is Done; kill the process just in case + _ = s.processInfo.Kill() + case <-s.processDoneCh: + // process has already exited + case <-time.After(processKillAfter): + // process is still running kill it + _ = s.processInfo.Kill() + } +} diff --git a/internal/pkg/otel/manager/extension.go b/internal/pkg/otel/manager/extension.go index 59d968df053..560282ea4f1 100644 --- a/internal/pkg/otel/manager/extension.go +++ b/internal/pkg/otel/manager/extension.go @@ -22,7 +22,7 @@ type evtPair struct { } type AgentStatusExtension struct { - mgr *OTelManager + statusCh chan *status.AggregateStatus telemetry component.TelemetrySettings aggregator *status.Aggregator eventCh chan *evtPair @@ -39,11 +39,11 @@ var _ componentstatus.Watcher = (*AgentStatusExtension)(nil) // NewAgentStatusExtension returns the agent_status extension to be used by the // OTel collector when running in hybrid mode. -func NewAgentStatusExtension(ctx context.Context, set extension.Settings, mgr *OTelManager) *AgentStatusExtension { +func NewAgentStatusExtension(ctx context.Context, set extension.Settings, statusCh chan *status.AggregateStatus) *AgentStatusExtension { ctx, cancel := context.WithCancel(ctx) aggregator := status.NewAggregator(status.PriorityRecoverable) as := &AgentStatusExtension{ - mgr: mgr, + statusCh: statusCh, telemetry: set.TelemetrySettings, aggregator: aggregator, eventCh: make(chan *evtPair), @@ -60,14 +60,14 @@ func NewAgentStatusExtension(ctx context.Context, set extension.Settings, mgr *O } // NewAgentStatusFactory provides a factory for creating the AgentStatusExtension. -func NewAgentStatusFactory(mgr *OTelManager) extension.Factory { +func NewAgentStatusFactory(statusCh chan *status.AggregateStatus) extension.Factory { return extension.NewFactory( AgentStatusExtensionType, func() component.Config { return nil }, func(ctx context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { - return NewAgentStatusExtension(ctx, set, mgr), nil + return NewAgentStatusExtension(ctx, set, statusCh), nil }, component.StabilityLevelDevelopment, ) @@ -184,5 +184,5 @@ func (as *AgentStatusExtension) triggerKickCh() { func (as *AgentStatusExtension) publishStatus() { current, _ := as.aggregator.AggregateStatus(status.ScopeAll, status.Verbose) - as.mgr.statusCh <- current + as.statusCh <- current } diff --git a/internal/pkg/otel/manager/healthcheck.go b/internal/pkg/otel/manager/healthcheck.go new file mode 100644 index 00000000000..e2e8eedf1d0 --- /dev/null +++ b/internal/pkg/otel/manager/healthcheck.go @@ -0,0 +1,274 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/gofrs/uuid/v5" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" +) + +const ( + // healthcheckv2 extension configuration settings + // https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/healthcheckv2extension + healthCheckExtensionName = "healthcheckv2" + healthCheckIncludePermanentErrors = true + healthCheckIncludeRecoverableErrors = true + healthCheckRecoveryDuration = "5m" + healthCheckHealthStatusPath = "/health/status" + healthCheckHealthStatusEnabled = true + healthCheckHealthConfigPath = "/health/config" + healthCheckHealthConfigEnabled = false +) + +// SerializableStatus is exported for json.Unmarshal +type SerializableStatus struct { + StartTimestamp *time.Time `json:"start_time,omitempty"` + *SerializableEvent + ComponentStatuses map[string]*SerializableStatus `json:"components,omitempty"` +} + +// SerializableEvent is exported for json.Unmarshal +type SerializableEvent struct { + Healthy bool `json:"healthy"` + StatusString string `json:"status"` + Error string `json:"error,omitempty"` + Timestamp time.Time `json:"status_time"` +} + +// stringToStatusMap is a map from string representation of status to componentstatus.Status. +var stringToStatusMap = map[string]componentstatus.Status{ + "StatusNone": componentstatus.StatusNone, + "StatusStarting": componentstatus.StatusStarting, + "StatusOK": componentstatus.StatusOK, + "StatusRecoverableError": componentstatus.StatusRecoverableError, + "StatusPermanentError": componentstatus.StatusPermanentError, + "StatusFatalError": componentstatus.StatusFatalError, + "StatusStopping": componentstatus.StatusStopping, + "StatusStopped": componentstatus.StatusStopped, +} + +// healthCheckEvent implements status.Event interface for health check events. +type healthCheckEvent struct { + status componentstatus.Status + timestamp time.Time + err error +} + +func (e *healthCheckEvent) Status() componentstatus.Status { return e.status } +func (e *healthCheckEvent) Timestamp() time.Time { return e.timestamp } +func (e *healthCheckEvent) Err() error { return e.err } + +// AllComponentsStatuses retrieves the status of all components from the health check endpoint. +func AllComponentsStatuses(ctx context.Context, httpHealthCheckPort int) (*status.AggregateStatus, error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d/%s?verbose", + httpHealthCheckPort, healthCheckHealthStatusPath), nil) + if err != nil { + return nil, fmt.Errorf("failed to create http request: %w", err) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to get status: %w", err) + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read body: %w", err) + } + + serStatus := &SerializableStatus{} + err = json.Unmarshal(body, serStatus) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal serializable status: %w", err) + } + + return fromSerializableStatus(serStatus), nil +} + +// fromSerializableStatus reconstructs an AggregateStatus from serializableStatus. +func fromSerializableStatus(ss *SerializableStatus) *status.AggregateStatus { + ev := fromSerializableEvent(ss.SerializableEvent) + + as := &status.AggregateStatus{ + Event: ev, + ComponentStatusMap: make(map[string]*status.AggregateStatus), + } + + for k, cs := range ss.ComponentStatuses { + as.ComponentStatusMap[k] = fromSerializableStatus(cs) + } + + return as +} + +// fromSerializableEvent reconstructs a status.Event from SerializableEvent. +func fromSerializableEvent(se *SerializableEvent) status.Event { + if se == nil { + return nil + } + + var err error + if se.Error != "" { + err = errors.New(se.Error) + } + + statusVal, ok := stringToStatusMap[se.StatusString] + if !ok { + statusVal = componentstatus.StatusNone + } + + return &healthCheckEvent{ + status: statusVal, + timestamp: se.Timestamp, + err: err, + } +} + +// compareStatuses checks if two AggregateStatuses are equal, excluding timestamp. +func compareStatuses(s1, s2 *status.AggregateStatus) bool { + if s1 == nil && s2 == nil { + // both nil + return true + } + if s1 == nil || s2 == nil { + // one of them is nil + return false + } + if s1.Status() != s2.Status() { + // status doesn't match + return false + } + + // NOTE: we don't check the timestamp + // as we care only about the event and component statuses/error differences + + if (s1.Err() == nil && s2.Err() != nil) || (s1.Err() != nil && s2.Err() == nil) { + return false + } + if s1.Err() != nil && s2.Err() != nil { + if s1.Err().Error() != s2.Err().Error() { + return false + } + } + + if len(s1.ComponentStatusMap) != len(s2.ComponentStatusMap) { + return false + } + for k, v1 := range s1.ComponentStatusMap { + v2, ok := s2.ComponentStatusMap[k] + if !ok { + return false + } + if !compareStatuses(v1, v2) { + return false + } + } + return true +} + +// aggregateStatus creates a new AggregateStatus with the provided component status and error. +func aggregateStatus(sts componentstatus.Status, err error) *status.AggregateStatus { + return &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: sts, + timestamp: time.Now(), + err: err, + }, + ComponentStatusMap: make(map[string]*status.AggregateStatus), + } +} + +// injectHeathCheckV2Extension injects the healthcheckv2 extension into the provided configuration. +func injectHeathCheckV2Extension(conf *confmap.Conf, httpHealthCheckPort int) error { + nsUUID, err := uuid.NewV4() + if err != nil { + return fmt.Errorf("cannot generate UUID V4: %w", err) + } + componentType := component.MustNewType(healthCheckExtensionName) + healthCheckExtensionID := component.NewIDWithName(componentType, nsUUID.String()).String() + err = conf.Merge(confmap.NewFromStringMap(map[string]interface{}{ + "extensions": map[string]interface{}{ + healthCheckExtensionID: map[string]interface{}{ + "use_v2": true, + "component_health": map[string]interface{}{ + "include_permanent_errors": healthCheckIncludePermanentErrors, + "include_recoverable_errors": healthCheckIncludeRecoverableErrors, + "recovery_duration": healthCheckRecoveryDuration, + }, + "http": map[string]interface{}{ + "endpoint": fmt.Sprintf("localhost:%d", httpHealthCheckPort), + "status": map[string]interface{}{ + "enabled": healthCheckHealthStatusEnabled, + "path": healthCheckHealthStatusPath, + }, + "config": map[string]interface{}{ + "enabled": healthCheckHealthConfigEnabled, + "path": healthCheckHealthConfigPath, + }, + }, + }, + }, + })) + if err != nil { + return fmt.Errorf("merge into extensions failed: %w", err) + } + serviceConf, err := conf.Sub("service") + if err != nil { + //nolint:nilerr // ignore the error, no service defined in the configuration + // this is going to error by the collector any way no reason to pollute with more + // error information that is not really related to the issue at the moment + return nil + } + extensionsRaw := serviceConf.Get("extensions") + if extensionsRaw == nil { + // no extensions defined on service (easily add it) + err = conf.Merge(confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": []interface{}{healthCheckExtensionID}, + }, + })) + if err != nil { + return fmt.Errorf("merge into service::extensions failed: %w", err) + } + return nil + } + extensionsSlice, ok := extensionsRaw.([]interface{}) + if !ok { + return fmt.Errorf("merge into service::extensions failed: expected []interface{}, got %T", extensionsRaw) + } + for _, extensionRaw := range extensionsSlice { + extension, ok := extensionRaw.(string) + if ok && extension == healthCheckExtensionID { + // already present, nothing to do + return nil + } + } + extensionsSlice = append(extensionsSlice, healthCheckExtensionID) + err = conf.Merge(confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "extensions": extensionsSlice, + }, + })) + if err != nil { + return fmt.Errorf("merge into service::extensions failed: %w", err) + } + return nil +} diff --git a/internal/pkg/otel/manager/healthcheck_test.go b/internal/pkg/otel/manager/healthcheck_test.go new file mode 100644 index 00000000000..73b01f51316 --- /dev/null +++ b/internal/pkg/otel/manager/healthcheck_test.go @@ -0,0 +1,254 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "errors" + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component/componentstatus" +) + +func TestCompareAggregateStatuses(t *testing.T) { + timestamp := time.Now() + for _, tc := range []struct { + name string + s1, s2 *status.AggregateStatus + expected bool + }{ + { + name: "equal statuses", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + expected: true, + }, + { + name: "unequal statuses", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusPermanentError, + timestamp: timestamp, + err: nil, + }, + }, + expected: false, + }, + { + name: "unequal errors", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: errors.New("error"), + }, + }, + expected: false, + }, + { + name: "unequal component statuses", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusStopped, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + expected: false, + }, + { + name: "more components", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + "component2": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + expected: false, + }, + { + name: "completely different components", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component3": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + }, + }, + expected: false, + }, + { + name: "unequal component errors", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: errors.New("error1"), + }, + }, + }, + }, + s2: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + ComponentStatusMap: map[string]*status.AggregateStatus{ + "component1": { + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: errors.New("error2"), + }, + }, + }, + }, + expected: false, + }, + { + name: "both nil", + s1: nil, + s2: nil, + expected: true, + }, + { + name: "one nil", + s1: &status.AggregateStatus{ + Event: &healthCheckEvent{ + status: componentstatus.StatusOK, + timestamp: timestamp, + err: nil, + }, + }, + s2: nil, + expected: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + actual := compareStatuses(tc.s1, tc.s2) + if actual != tc.expected { + t.Errorf("expected %v, got %v", tc.expected, actual) + } + }) + } +} diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index 17b33a41482..9a9a77f34bc 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -7,19 +7,39 @@ package manager import ( "context" "errors" + "fmt" + "os" + "sync/atomic" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/confmap" - "go.opentelemetry.io/collector/otelcol" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "github.com/elastic/elastic-agent/internal/pkg/otel" - "github.com/elastic/elastic-agent/internal/pkg/otel/agentprovider" - "github.com/elastic/elastic-agent/internal/pkg/release" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/pkg/core/logger" ) +type ExecutionMode string + +const ( + SubprocessExecutionMode ExecutionMode = "subprocess" + EmbeddedExecutionMode ExecutionMode = "embedded" +) + +type collectorRecoveryTimer interface { + // IsStopped returns true if the timer is stopped + IsStopped() bool + // Stop stops the timer + Stop() + // ResetInitial resets the timer to the initial interval + ResetInitial() time.Duration + // ResetNext resets the timer to the next interval + ResetNext() time.Duration + // C returns the timer channel + C() <-chan time.Time +} + // OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent. type OTelManager struct { // baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields. @@ -41,55 +61,132 @@ type OTelManager struct { // doneChan is closed when Run is stopped to signal that any // pending update calls should be ignored. doneChan chan struct{} + + // recoveryTimer is used to restart the collector when it has errored. + recoveryTimer collectorRecoveryTimer + + // recoveryRetries is the number of times the collector has been + // restarted through the recovery timer. + recoveryRetries atomic.Uint32 + + // execution is used to invoke the collector into different execution modes + execution collectorExecution } // NewOTelManager returns a OTelManager. -func NewOTelManager(logger, baseLogger *logger.Logger) *OTelManager { - return &OTelManager{ - logger: logger, - baseLogger: baseLogger, - errCh: make(chan error, 1), // holds at most one error - cfgCh: make(chan *confmap.Conf), - statusCh: make(chan *status.AggregateStatus), - doneChan: make(chan struct{}), +func NewOTelManager(logger *logger.Logger, logLevel logp.Level, baseLogger *logger.Logger, mode ExecutionMode) (*OTelManager, error) { + var exec collectorExecution + var recoveryTimer collectorRecoveryTimer + switch mode { + case SubprocessExecutionMode: + // NOTE: if we stop embedding the collector binary in elastic-agent, we need to + // change this + executable, err := os.Executable() + if err != nil { + return nil, fmt.Errorf("failed to get the path to the collector executable: %w", err) + } + recoveryTimer = newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute) + exec = newSubprocessExecution(logLevel, executable) + case EmbeddedExecutionMode: + recoveryTimer = newRestarterNoop() + exec = newExecutionEmbedded() + default: + return nil, errors.New("unknown otel collector exec") } + + logger.Debugf("Using collector execution mode: %s", mode) + + return &OTelManager{ + logger: logger, + baseLogger: baseLogger, + errCh: make(chan error, 1), // holds at most one error + cfgCh: make(chan *confmap.Conf), + statusCh: make(chan *status.AggregateStatus), + doneChan: make(chan struct{}), + execution: exec, + recoveryTimer: recoveryTimer, + }, nil } // Run runs the lifecycle of the manager. func (m *OTelManager) Run(ctx context.Context) error { - var err error - var cancel context.CancelFunc - var provider *agentprovider.Provider + var ( + err error + proc collectorHandle + ) // signal that the run loop is ended to unblock any incoming update calls defer close(m.doneChan) - runErrCh := make(chan error) + // collectorRunErr is used to signal that the collector has exited. + collectorRunErr := make(chan error) for { select { case <-ctx.Done(): - if cancel != nil { - cancel() - <-runErrCh // wait for collector to be stopped + m.recoveryTimer.Stop() + // our caller context is cancelled so stop the collector and return + // has exited. + if proc != nil { + proc.Stop(ctx) } return ctx.Err() - case err = <-runErrCh: + case <-m.recoveryTimer.C(): + m.recoveryTimer.Stop() + + if m.cfg == nil || proc != nil || ctx.Err() != nil { + // no configuration, or the collector is already running, or the context + // is cancelled. + continue + } + + newRetries := m.recoveryRetries.Add(1) + m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) + proc, err = m.execution.startCollector(ctx, m.logger, m.cfg, collectorRunErr, m.statusCh) + if err != nil { + reportErr(ctx, m.errCh, err) + // reset the restart timer to the next backoff + recoveryDelay := m.recoveryTimer.ResetNext() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) + } else { + reportErr(ctx, m.errCh, nil) + } + + case err = <-collectorRunErr: + m.recoveryTimer.Stop() if err == nil { - // err is nil but there is a configuration - // + // err is nil means that the collector has exited cleanly without an error + if proc != nil { + proc.Stop(ctx) + proc = nil + reportStatus(ctx, m.statusCh, nil) + } + + if m.cfg == nil { + // no configuration then the collector should not be + // running. + // ensure that the coordinator knows that there is no error + // as the collector is not running anymore + reportErr(ctx, m.errCh, nil) + continue + } + + m.logger.Warnf("collector exited without an error but a configuration was provided") + // in this rare case the collector stopped running but a configuration was // provided and the collector stopped with a clean exit - cancel() - cancel, provider, err = m.startCollector(m.cfg, runErrCh) + proc, err = m.execution.startCollector(ctx, m.logger, m.cfg, collectorRunErr, m.statusCh) if err != nil { // failed to create the collector (this is different then // it's failing to run). we do not retry creation on failure // as it will always fail a new configuration is required for // it not to fail (a new configuration will result in the retry) - m.reportErr(ctx, err) + reportErr(ctx, m.errCh, err) + // reset the restart timer to the next backoff + recoveryDelay := m.recoveryTimer.ResetNext() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) } else { // all good at the moment (possible that it will fail) - m.reportErr(ctx, nil) + reportErr(ctx, m.errCh, nil) } } else { // error occurred while running the collector, this occurs in the @@ -98,65 +195,64 @@ func (m *OTelManager) Run(ctx context.Context) error { // // in the case that the configuration is invalid there is no reason to // try again as it will keep failing so we do not trigger a restart - if cancel != nil { - cancel() - cancel = nil - provider = nil - // don't wait here for <-runErrCh, already occurred + if proc != nil { + proc.Stop(ctx) + proc = nil + // don't wait here for <-collectorRunErr, already occurred // clear status, no longer running - select { - case m.statusCh <- nil: - case <-ctx.Done(): - } + reportStatus(ctx, m.statusCh, nil) } // pass the error to the errCh so the coordinator, unless it's a cancel error if !errors.Is(err, context.Canceled) { - m.reportErr(ctx, err) + reportErr(ctx, m.errCh, err) + // reset the restart timer to the next backoff + recoveryDelay := m.recoveryTimer.ResetNext() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) } } + case cfg := <-m.cfgCh: + // we received a new configuration, thus stop the recovery timer + // and reset the retry count + m.recoveryTimer.Stop() + m.recoveryRetries.Store(0) m.cfg = cfg + + if proc != nil { + proc.Stop(ctx) + proc = nil + select { + case <-collectorRunErr: + case <-ctx.Done(): + // our caller ctx is Done + return ctx.Err() + } + reportStatus(ctx, m.statusCh, nil) + } + if cfg == nil { // no configuration then the collector should not be - // running. if a cancel exists then it is running - // this cancels the context that will stop the running - // collector (this configuration does not get passed - // to the agent provider as an update) - if cancel != nil { - cancel() - cancel = nil - provider = nil - <-runErrCh // wait for collector to be stopped - // clear status, no longer running - select { - case m.statusCh <- nil: - case <-ctx.Done(): - } - } + // running. // ensure that the coordinator knows that there is no error // as the collector is not running anymore - m.reportErr(ctx, nil) + reportErr(ctx, m.errCh, nil) } else { // either a new configuration or the first configuration // that results in the collector being started - if cancel == nil { - // no cancel exists so the collector has not been - // started. start the collector with this configuration - cancel, provider, err = m.startCollector(m.cfg, runErrCh) - if err != nil { - // failed to create the collector (this is different then - // it's failing to run). we do not retry creation on failure - // as it will always fail a new configuration is required for - // it not to fail (a new configuration will result in the retry) - m.reportErr(ctx, err) - } else { - // all good at the moment (possible that it will fail) - m.reportErr(ctx, nil) - } + proc, err = m.execution.startCollector(ctx, m.logger, m.cfg, collectorRunErr, m.statusCh) + if err != nil { + // failed to create the collector (this is different then + // it's failing to run). we do not retry creation on failure + // as it will always fail. A new configuration is required for + // it not to fail (a new configuration will result in the retry) + reportErr(ctx, m.errCh, err) + // since this is a new configuration we want to start the timer + // from the initial delay + recoveryDelay := m.recoveryTimer.ResetInitial() + m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) } else { - // collector is already running so only the configuration - // needs to be updated in the collector - provider.Update(m.cfg) + // all good at the moment (possible that it will fail) + reportErr(ctx, m.errCh, nil) } } } @@ -185,47 +281,3 @@ func (m *OTelManager) Update(cfg *confmap.Conf) { func (m *OTelManager) Watch() <-chan *status.AggregateStatus { return m.statusCh } - -func (m *OTelManager) startCollector(cfg *confmap.Conf, errCh chan error) (context.CancelFunc, *agentprovider.Provider, error) { - ctx, cancel := context.WithCancel(context.Background()) - ap := agentprovider.NewProvider(cfg) - - // NewForceExtensionConverterFactory is used to ensure that the agent_status extension is always enabled. - // It is required for the Elastic Agent to extract the status out of the OTel collector. - settings := otel.NewSettings( - release.Version(), []string{ap.URI()}, - otel.WithConfigProviderFactory(ap.NewFactory()), - otel.WithConfigConvertorFactory(NewForceExtensionConverterFactory(AgentStatusExtensionType.String())), - otel.WithExtensionFactory(NewAgentStatusFactory(m))) - settings.DisableGracefulShutdown = true // managed by this manager - settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core { - return m.baseLogger.Core() // use the base logger also used for logs from the command runtime - })} - svc, err := otelcol.NewCollector(*settings) - if err != nil { - cancel() - return nil, nil, err - } - go func() { - errCh <- svc.Run(ctx) - }() - return cancel, ap, nil -} - -// reportErr reports an error to the service that is controlling this manager -// -// the manager can be blocked doing other work like sending this manager a new configuration -// so we do not want error reporting to be a blocking send over the channel -// -// the manager really only needs the most recent error, so if it misses an error it's not a big -// deal, what matters is that it has the current error for the state of this manager -func (m *OTelManager) reportErr(ctx context.Context, err error) { - select { - case <-m.errCh: - default: - } - select { - case m.errCh <- err: - case <-ctx.Done(): - } -} diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index 161cb86eeb8..26591588b1f 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -9,19 +9,22 @@ package manager import ( "context" "errors" + "os" + "path/filepath" "sync" "testing" "time" - "gopkg.in/yaml.v2" - + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" + "gopkg.in/yaml.v2" + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/core/logger/loggertest" ) @@ -64,219 +67,456 @@ var ( } ) -func TestOTelManager_Run(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - base, _ := loggertest.New("otel") - l, _ := loggertest.New("otel-manager") - m := NewOTelManager(l, base) +type mockExecution struct { + mtx sync.Mutex + exec collectorExecution + handle collectorHandle +} + +func (m *mockExecution) startCollector(ctx context.Context, logger *logger.Logger, cfg *confmap.Conf, errCh chan error, statusCh chan *status.AggregateStatus) (collectorHandle, error) { + m.mtx.Lock() + defer m.mtx.Unlock() - var errMx sync.Mutex var err error - go func() { - for { - select { - case <-ctx.Done(): - return - case e := <-m.Errors(): - if e != nil { - // no error should be produced (any error is a failure) - errMx.Lock() - err = e - errMx.Unlock() - } - } - } - }() - getLatestErr := func() error { - errMx.Lock() - defer errMx.Unlock() - return err - } + m.handle, err = m.exec.startCollector(ctx, logger, cfg, errCh, statusCh) + return m.handle, err +} - var latestMx sync.Mutex - var latest *status.AggregateStatus - go func() { - for { - select { - case <-ctx.Done(): - return - case c := <-m.Watch(): - latestMx.Lock() - latest = c - latestMx.Unlock() - } - } - }() - getLatestStatus := func() *status.AggregateStatus { - latestMx.Lock() - defer latestMx.Unlock() - return latest - } +func (m *mockExecution) getProcessHandle() collectorHandle { + m.mtx.Lock() + defer m.mtx.Unlock() - var runWg sync.WaitGroup - var runErr error - runWg.Add(1) - go func() { - defer runWg.Done() - runErr = m.Run(ctx) - }() - - ensureHealthy := func() { - if !assert.Eventuallyf(t, func() bool { - err := getLatestErr() - if err != nil { - // return now (but not for the correct reasons) - return true - } - latest := getLatestStatus() - if latest == nil || latest.Status() != componentstatus.StatusOK { - return false - } - return true - }, 5*time.Minute, 1*time.Second, "otel collector never got healthy") { - lastStatus := getLatestStatus() - lastErr := getLatestErr() + return m.handle +} - // never got healthy, stop the manager and wait for it to end - cancel() - runWg.Wait() +// EventListener listens to the events from the OTelManager and stores the latest error and status. +type EventListener struct { + mtx sync.Mutex + err *EventTime[error] + status *EventTime[*status.AggregateStatus] +} - // if a run error happened then report that - if !errors.Is(runErr, context.Canceled) { - t.Fatalf("otel manager never got healthy and the otel manager returned unexpected error: %v (latest status: %+v) (latest err: %v)", runErr, lastStatus, lastErr) - } - t.Fatalf("otel collector never got healthy: %s (latest err: %v)", statusToYaml(lastStatus), lastErr) +// Listen starts listening to the error and status channels. It updates the latest error and status in the +// EventListener. +func (e *EventListener) Listen(ctx context.Context, errorCh <-chan error, statusCh <-chan *status.AggregateStatus) { + for { + select { + case <-ctx.Done(): + return + case c := <-statusCh: + e.mtx.Lock() + e.status = &EventTime[*status.AggregateStatus]{val: c, time: time.Now()} + e.mtx.Unlock() + case c := <-errorCh: + e.mtx.Lock() + e.err = &EventTime[error]{val: c, time: time.Now()} + e.mtx.Unlock() } - latestErr := getLatestErr() - require.NoError(t, latestErr, "runtime errored") } +} - ensureOff := func() { - require.Eventuallyf(t, func() bool { - err := getLatestErr() - if err != nil { - // return now (but not for the correct reasons) - return true - } - latest := getLatestStatus() - return latest == nil - }, 5*time.Minute, 1*time.Second, "otel collector never stopped") - latestErr := getLatestErr() - require.NoError(t, latestErr, "runtime errored") - } +// getError retrieves the latest error from the EventListener. +func (e *EventListener) getError() error { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.err.Value() +} - // ensure that it got healthy - cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg) - ensureHealthy() +// getStatus retrieves the latest status from the EventListener. +func (e *EventListener) getStatus() *status.AggregateStatus { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.status.Value() +} - // trigger update (no config compare is due externally to otel collector) - m.Update(cfg) - ensureHealthy() +// EnsureHealthy ensures that the OTelManager is healthy by checking the latest error and status. +func (e *EventListener) EnsureHealthy(t *testing.T, u time.Time) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.status + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is StatusOK + require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() != nil) + require.False(collect, latestStatus == nil || latestStatus.Value() == nil || latestStatus.Before(u) || latestStatus.Value().Status() != componentstatus.StatusOK) + }, 60*time.Second, 1*time.Second, "otel collector never got healthy") +} - // no configuration should stop the runner - m.Update(nil) - ensureOff() +// EnsureOffWithoutError ensures that the OTelManager is off without an error by checking the latest error and status. +func (e *EventListener) EnsureOffWithoutError(t *testing.T, u time.Time) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.status + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is nil + require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() != nil) + require.False(collect, latestStatus == nil || latestStatus.Before(u) || latestStatus.Value() != nil) + }, 60*time.Second, 1*time.Second, "otel collector never stopped without an error") +} - cancel() - runWg.Wait() - if !errors.Is(runErr, context.Canceled) { - t.Errorf("otel manager returned unexpected error: %v", runErr) - } +// EnsureOffWithError ensures that the OTelManager is off with an error by checking the latest error and status. +func (e *EventListener) EnsureOffWithError(t *testing.T, u time.Time) { + require.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.status + e.mtx.Unlock() + + // we expect to have a reported error which is not nil and a reported status which is nil + require.False(collect, latestErr == nil || latestErr.Before(u) || latestErr.Value() == nil) + require.False(collect, latestStatus == nil || latestStatus.Before(u) || latestStatus.Value() != nil) + }, 60*time.Second, 1*time.Second, "otel collector never errored with an error") } -func TestOTelManager_ConfigError(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - base, _ := loggertest.New("otel") - l, _ := loggertest.New("otel-manager") - m := NewOTelManager(l, base) - - go func() { - err := m.Run(ctx) - assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") - }() - - // watch is synchronous, so we need to read from it to avoid blocking the manager - go func() { - for { - select { - case <-m.Watch(): - case <-ctx.Done(): - return - } - } - }() +// EventTime is a wrapper around a time.Time and a value of type T. It provides methods to compare the time and retrieve the value. +type EventTime[T interface{}] struct { + time time.Time + val T +} - // Errors channel is non-blocking, should be able to send an Update that causes an error multiple - // times without it blocking on sending over the errCh. - for range 3 { - cfg := confmap.New() // invalid config - m.Update(cfg) +// Before checks if the EventTime's time is before the given time u. +func (t *EventTime[T]) Before(u time.Time) bool { + return t != nil && t.time.Before(u) +} - // delay between updates to ensure the collector will have to fail - <-time.After(100 * time.Millisecond) +// Value retrieves the value of type T from the EventTime. If the EventTime is nil, it returns the zero value of T. +func (t *EventTime[T]) Value() T { + if t == nil { + var zero T + return zero } + return t.val +} - // because of the retry logic and timing we need to ensure - // that this keeps retrying to see the error and only store - // an actual error - // - // a nil error just means that the collector is trying to restart - // which clears the error on the restart loop - timeoutCh := time.After(time.Second * 5) - var err error -outer: - for { - select { - case e := <-m.Errors(): - if e != nil { - err = e - break outer - } - case <-timeoutCh: - break outer - } +// Time retrieves the time associated with the EventTime. If the EventTime is nil, it returns the zero value of time.Time. +func (t *EventTime[T]) Time() time.Time { + if t == nil { + return time.Time{} } - assert.Error(t, err, "otel manager should have returned an error") + return t.time } -func TestOTelManager_Logging(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - base, obs := loggertest.New("otel") - l, _ := loggertest.New("otel-manager") - m := NewOTelManager(l, base) - - go func() { - err := m.Run(ctx) - assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled") - }() - - // watch is synchronous, so we need to read from it to avoid blocking the manager - go func() { - for { - select { - case <-m.Watch(): - case <-ctx.Done(): - return +func TestOTelManager_Run(t *testing.T) { + wd, erWd := os.Getwd() + require.NoError(t, erWd, "cannot get working directory") + + testBinary := filepath.Join(wd, "testing", "testing") + require.FileExists(t, testBinary, "testing binary not found") + + for _, tc := range []struct { + name string + exec *mockExecution + restarter collectorRecoveryTimer + skipListeningErrors bool + testFn func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) + }{ + { + name: "embedded collector config updates", + exec: &mockExecution{exec: newExecutionEmbedded()}, + restarter: newRestarterNoop(), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // trigger update (no config compare is due externally to otel collector) + updateTime = time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + }, + }, + { + name: "subprocess collector config updates", + exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // trigger update (no config compare is due externally to otel collector) + updateTime = time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") + }, + }, + { + name: "embedded collector stopped gracefully outside manager", + exec: &mockExecution{exec: newExecutionEmbedded()}, + restarter: newRestarterNoop(), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // stop it, this should be restarted by the manager + updateTime = time.Now() + require.NotNil(t, exec.handle, "exec handle should not be nil") + exec.handle.Stop(t.Context()) + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + }, + }, + { + name: "subprocess collector stopped gracefully outside manager", + exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + // stop it, this should be restarted by the manager + updateTime = time.Now() + require.NotNil(t, exec.handle, "exec handle should not be nil") + exec.handle.Stop(t.Context()) + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(0), m.recoveryRetries.Load(), "recovery retries should be 0") + }, + }, + { + name: "subprocess collector killed outside manager", + exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + updateTime := time.Now() + m.Update(cfg) + e.EnsureHealthy(t, updateTime) + + var oldPHandle *procHandle + // repeatedly kill the collector + for i := 0; i < 3; i++ { + // kill it + handle := exec.getProcessHandle() + require.NotNil(t, handle, "exec handle should not be nil, iteration ", i) + pHandle, ok := handle.(*procHandle) + require.True(t, ok, "exec handle should be of type procHandle, iteration ", i) + if oldPHandle != nil { + require.NotEqual(t, pHandle.processInfo.PID, oldPHandle.processInfo.PID, "processes PIDs should be different, iteration ", i) + } + oldPHandle = pHandle + require.NoError(t, pHandle.processInfo.Kill(), "failed to kill collector process, iteration ", i) + // the collector should restart and report healthy + updateTime = time.Now() + e.EnsureHealthy(t, updateTime) + } + + seenRecoveredTimes := m.recoveryRetries.Load() + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + assert.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.Equal(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") + }, + }, + { + name: "subprocess collector panics", + exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", (3 * time.Second).String()) + require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") + t.Cleanup(func() { + _ = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + }) + + // ensure that it got healthy + cfg := confmap.NewFromStringMap(testConfig) + m.Update(cfg) + + seenRecoveredTimes := uint32(0) + require.Eventually(t, func() bool { + seenRecoveredTimes = m.recoveryRetries.Load() + return seenRecoveredTimes > 2 + }, time.Minute, time.Second, "expected recovered times to be at least 3, got %d", seenRecoveredTimes) + + err = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + require.NoError(t, err, "failed to unset TEST_SUPERVISED_COLLECTOR_PANIC env var") + updateTime := time.Now() + e.EnsureHealthy(t, updateTime) + + // no configuration should stop the runner + updateTime = time.Now() + m.Update(nil) + e.EnsureOffWithoutError(t, updateTime) + require.True(t, m.recoveryTimer.IsStopped(), "restart timer should be stopped") + assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") + }, + }, + { + name: "embedded collector invalid config", + exec: &mockExecution{exec: newExecutionEmbedded()}, + restarter: newRestarterNoop(), + skipListeningErrors: true, + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // Errors channel is non-blocking, should be able to send an Update that causes an error multiple + // times without it blocking on sending over the errCh. + for range 3 { + cfg := confmap.New() // invalid config + m.Update(cfg) + + // delay between updates to ensure the collector will have to fail + <-time.After(100 * time.Millisecond) + } + + // because of the retry logic and timing we need to ensure + // that this keeps retrying to see the error and only store + // an actual error + // + // a nil error just means that the collector is trying to restart + // which clears the error on the restart loop + timeoutCh := time.After(time.Second * 5) + var err error + outer: + for { + select { + case e := <-m.Errors(): + if e != nil { + err = e + break outer + } + case <-timeoutCh: + break outer + } + } + assert.Error(t, err, "otel manager should have returned an error") + }, + }, + { + name: "subprocess collector invalid config", + exec: &mockExecution{exec: newSubprocessExecution(logp.DebugLevel, testBinary)}, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + skipListeningErrors: true, + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *mockExecution) { + // Errors channel is non-blocking, should be able to send an Update that causes an error multiple + // times without it blocking on sending over the errCh. + for range 3 { + cfg := confmap.New() // invalid config + m.Update(cfg) + + // delay between updates to ensure the collector will have to fail + <-time.After(100 * time.Millisecond) + } + + // because of the retry logic and timing we need to ensure + // that this keeps retrying to see the error and only store + // an actual error + // + // a nil error just means that the collector is trying to restart + // which clears the error on the restart loop + timeoutCh := time.After(time.Second * 5) + var err error + outer: + for { + select { + case e := <-m.Errors(): + if e != nil { + err = e + break outer + } + case <-timeoutCh: + break outer + } + } + assert.Error(t, err, "otel manager should have returned an error") + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + l, _ := loggertest.New("otel") + base, _ := loggertest.New("otel") + m := &OTelManager{ + logger: l, + baseLogger: base, + errCh: make(chan error, 1), // holds at most one error + cfgCh: make(chan *confmap.Conf), + statusCh: make(chan *status.AggregateStatus), + doneChan: make(chan struct{}), + recoveryTimer: tc.restarter, + execution: tc.exec, } - } - }() - cfg := confmap.NewFromStringMap(testConfig) - m.Update(cfg) + eListener := &EventListener{} + defer func() { + if !t.Failed() { + return + } + t.Logf("latest received err: %s", eListener.getError()) + t.Logf("latest received status: %s", statusToYaml(eListener.getStatus())) + }() + + runWg := sync.WaitGroup{} + runWg.Add(1) + go func() { + defer runWg.Done() + if !tc.skipListeningErrors { + eListener.Listen(ctx, m.Errors(), m.Watch()) + } else { + eListener.Listen(ctx, nil, m.Watch()) + } + }() - // the collector should log to the base logger - assert.EventuallyWithT(t, func(collect *assert.CollectT) { - logs := obs.All() - require.NotEmpty(collect, logs, "Logs should not be empty") - firstMessage := logs[0].Message - assert.Equal(collect, firstMessage, "Setting up own telemetry...") - }, time.Second*10, time.Second) + var runErr error + runWg.Add(1) + go func() { + defer runWg.Done() + runErr = m.Run(ctx) + }() + + tc.testFn(t, m, eListener, tc.exec) + + cancel() + runWg.Wait() + if !errors.Is(runErr, context.Canceled) { + t.Errorf("otel manager returned unexpected error: %v", runErr) + } + }) + } } +// statusToYaml converts the status.AggregateStatus to a YAML string representation. func statusToYaml(s *status.AggregateStatus) string { printable := toSerializableStatus(s) yamlBytes, _ := yaml.Marshal(printable) diff --git a/internal/pkg/otel/manager/recovery_backoff.go b/internal/pkg/otel/manager/recovery_backoff.go new file mode 100644 index 00000000000..6b51eb8ee08 --- /dev/null +++ b/internal/pkg/otel/manager/recovery_backoff.go @@ -0,0 +1,87 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "time" + + "github.com/cenkalti/backoff/v5" +) + +type recoveryBackoff struct { + backoff *backoff.ExponentialBackOff + timer *time.Timer + prevReset time.Time + resetToInitial time.Duration + stopped bool +} + +// newRecoveryBackoff returns a new recoveryBackoff. +// - initialInterval: the initial backoff interval +// - maxInterval: the maximum backoff interval +// - resetToInitial: the duration after which ResetNext will reset to the initial backoff interval +// (if set to 0, the timer will never be reset to the initial interval) +func newRecoveryBackoff( + initialInterval time.Duration, + maxInterval time.Duration, + resetToInitial time.Duration, +) *recoveryBackoff { + timer := time.NewTimer(time.Second) + timer.Stop() + + return &recoveryBackoff{ + timer: timer, + stopped: true, + resetToInitial: resetToInitial, + backoff: &backoff.ExponentialBackOff{ + InitialInterval: initialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: maxInterval, + }, + } +} + +// IsStopped returns true if the timer is stopped +func (r *recoveryBackoff) IsStopped() bool { + return r.stopped +} + +// Stop stops the timer +func (r *recoveryBackoff) Stop() { + r.timer.Stop() + r.stopped = true +} + +// ResetInitial resets the timer to the initial interval and returns the duration that the timer was reset to +func (r *recoveryBackoff) ResetInitial() time.Duration { + r.backoff.Reset() + delay := r.backoff.NextBackOff() + r.timer.Reset(delay) + r.prevReset = time.Now() + r.stopped = false + return delay +} + +// C returns the timer channel +func (r *recoveryBackoff) C() <-chan time.Time { + return r.timer.C +} + +// ResetNext resets the timer to the next backoff interval and returns the duration that the timer was reset to. +// Note that this will reset to the initial interval if resetToInitial is set and the time since the previous +// ResetNext exceeds it. +func (r *recoveryBackoff) ResetNext() time.Duration { + if r.resetToInitial != 0 && time.Now().After(r.prevReset.Add(r.resetToInitial)) { + // resetToInitial is set and the time since the last reset exceeds resetToInitial, + // so reset the backoff to the initial interval + return r.ResetInitial() + } + + r.prevReset = time.Now() + delay := r.backoff.NextBackOff() + r.timer.Reset(delay) + return delay +} diff --git a/internal/pkg/otel/manager/recovery_backoff_test.go b/internal/pkg/otel/manager/recovery_backoff_test.go new file mode 100644 index 00000000000..d3999044bf1 --- /dev/null +++ b/internal/pkg/otel/manager/recovery_backoff_test.go @@ -0,0 +1,45 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRecoveryBackoff(t *testing.T) { + initialInterval := 100 * time.Nanosecond + maxInterval := 1 * time.Second + resetToInitial := 10 * time.Second + recovery := newRecoveryBackoff(initialInterval, maxInterval, resetToInitial) + recovery.backoff.RandomizationFactor = 0 + assert.True(t, recovery.stopped, "timer should be stopped when instantiated") + + delay := recovery.ResetInitial() + assert.False(t, recovery.stopped, "timer should not be stopped") + assert.Equal(t, initialInterval, delay, "timer reset duration should be the initial interval") + + delay = recovery.ResetNext() + assert.False(t, recovery.stopped, "timer should not be stopped") + assert.Greater(t, delay, initialInterval, "timer reset duration should be greater than the initial interval") + assert.Less(t, delay, maxInterval, "timer reset duration should be less than the max interval") + + // wait for resetToInitial to check that ResetNext will reset to the initial backoff + select { + case <-time.After(resetToInitial + 2*time.Second): + // add 2 extra seconds to account for jitter + case <-t.Context().Done(): + t.Fatal("timed out waiting for resetToInitial") + } + + delay = recovery.ResetNext() + assert.False(t, recovery.stopped, "timer should not be stopped") + assert.Equal(t, initialInterval, delay, "timer reset duration should be reset to the initial interval") + + recovery.Stop() + assert.True(t, recovery.stopped, "timer should be stopped") +} diff --git a/internal/pkg/otel/manager/recovery_noop.go b/internal/pkg/otel/manager/recovery_noop.go new file mode 100644 index 00000000000..76129ab0810 --- /dev/null +++ b/internal/pkg/otel/manager/recovery_noop.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package manager + +import "time" + +type recoveryNoop struct { +} + +// newRestarterNoop returns a noop recovery timer +func newRestarterNoop() *recoveryNoop { + return &recoveryNoop{} +} + +// IsStopped returns always true +func (r *recoveryNoop) IsStopped() bool { + return true +} + +// Stop has no effect on the noop recovery timer +func (r *recoveryNoop) Stop() { +} + +// ResetInitial has no effect on noop the recovery timer +func (r *recoveryNoop) ResetInitial() time.Duration { + return 0 +} + +// C returns always nil +func (r *recoveryNoop) C() <-chan time.Time { + return nil +} + +// ResetNext has no effect on the noop recovery timer +func (r *recoveryNoop) ResetNext() time.Duration { + return 0 +} diff --git a/internal/pkg/otel/manager/testing/testing.go b/internal/pkg/otel/manager/testing/testing.go new file mode 100644 index 00000000000..6ad11fd8644 --- /dev/null +++ b/internal/pkg/otel/manager/testing/testing.go @@ -0,0 +1,36 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package main + +import ( + "context" + "errors" + "os" + "time" + + "github.com/elastic/elastic-agent/internal/pkg/agent/cmd" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if panicEnvVar := os.Getenv("TEST_SUPERVISED_COLLECTOR_PANIC"); panicEnvVar != "" { + panicDelay, err := time.ParseDuration(panicEnvVar) + if err != nil { + // fallback to 3 seconds + panicDelay = 3 * time.Second + } + time.AfterFunc(panicDelay, func() { + panic("test panic") + }) + } + + err := cmd.RunCollector(ctx, nil, true, "debug") + if err == nil || errors.Is(err, context.Canceled) { + os.Exit(0) + } + os.Exit(1) +} diff --git a/internal/pkg/otel/run.go b/internal/pkg/otel/run.go index f28e59c73a6..adf342765db 100644 --- a/internal/pkg/otel/run.go +++ b/internal/pkg/otel/run.go @@ -19,15 +19,12 @@ import ( "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/otelcol" - - "github.com/elastic/elastic-agent/internal/pkg/release" ) const buildDescription = "Elastic opentelemetry-collector distribution" -func Run(ctx context.Context, stop chan bool, configFiles []string) error { +func Run(ctx context.Context, stop chan bool, settings *otelcol.CollectorSettings) error { fmt.Fprintln(os.Stdout, "Starting in otel mode") - settings := NewSettings(release.Version(), configFiles) svc, err := otelcol.NewCollector(*settings) if err != nil { return err diff --git a/internal/pkg/otel/translate/status.go b/internal/pkg/otel/translate/status.go index d49cf645923..e04b57dbafe 100644 --- a/internal/pkg/otel/translate/status.go +++ b/internal/pkg/otel/translate/status.go @@ -63,6 +63,10 @@ func DropComponentStateFromOtelStatus(otelStatus *status.AggregateStatus) (*stat newStatus := deepCopyStatus(otelStatus) for pipelineStatusId := range newStatus.ComponentStatusMap { + if pipelineStatusId == "extensions" { + // we do not want to report extension status + continue + } pipelineId := &pipeline.ID{} componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId) if componentKind != "pipeline" { @@ -90,6 +94,10 @@ func getOtelRuntimePipelineStatuses(otelStatus *status.AggregateStatus) (map[str pipelines := make(map[string]*status.AggregateStatus, len(otelStatus.ComponentStatusMap)) for pipelineStatusId, pipelineStatus := range otelStatus.ComponentStatusMap { + if pipelineStatusId == "extensions" { + // we do not want to report extension status + continue + } pipelineId := &pipeline.ID{} componentKind, pipelineIdStr := parseEntityStatusId(pipelineStatusId) if componentKind != "pipeline" { diff --git a/magefile.go b/magefile.go index 7f2f5b6d6a2..daf91d10ddd 100644 --- a/magefile.go +++ b/magefile.go @@ -436,6 +436,7 @@ func getTestBinariesPath() ([]string, error) { testBinaryPkgs := []string{ filepath.Join(wd, "pkg", "component", "fake", "component"), filepath.Join(wd, "internal", "pkg", "agent", "install", "testblocking"), + filepath.Join(wd, "internal", "pkg", "otel", "manager", "testing"), } return testBinaryPkgs, nil } diff --git a/pkg/component/runtime/log_writer.go b/pkg/component/runtime/log_writer.go index 57ffb4bf34b..c5bd1ce3990 100644 --- a/pkg/component/runtime/log_writer.go +++ b/pkg/component/runtime/log_writer.go @@ -60,6 +60,17 @@ func newLogWriter(core zapcoreWriter, logCfg component.CommandLogSpec, ll zapcor } } +func NewLogWriterWithDefaults(core zapcoreWriter, ll zapcore.Level) *logWriter { + cmdLogSpec := component.CommandLogSpec{} + cmdLogSpec.InitDefaults() + return &logWriter{ + loggerCore: core, + logCfg: cmdLogSpec, + logLevel: zap.NewAtomicLevelAt(ll), + inheritLevel: ll, + } +} + func (r *logWriter) SetLevels(ll zapcore.Level, unitLevels map[string]zapcore.Level) { // must hold to lock so Write doesn't access the unitLevels r.mx.Lock() @@ -165,14 +176,16 @@ func getLevel(evt map[string]interface{}, key string) zapcore.Level { } func unmarshalLevel(lvl *zapcore.Level, val string) error { - if val == "" { + switch val { + case "": return errors.New("empty val") - } else if val == "trace" { + case "trace": // zap doesn't handle trace level we cast to debug *lvl = zapcore.DebugLevel return nil + default: + return lvl.UnmarshalText([]byte(val)) } - return lvl.UnmarshalText([]byte(val)) } func getMessage(evt map[string]interface{}, key string) string { diff --git a/pkg/core/process/process.go b/pkg/core/process/process.go index f50b83cbe11..8d167d45176 100644 --- a/pkg/core/process/process.go +++ b/pkg/core/process/process.go @@ -148,9 +148,13 @@ func startContext(ctx context.Context, path string, uid, gid int, args []string, return nil, fmt.Errorf("failed to set option command for %q: %w", path, err) } } - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("failed to create stdin for %q: %w", path, err) + + var stdin io.WriteCloser + if cmd.Stdin == nil { + stdin, err = cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdin for %q: %w", path, err) + } } var stderr io.ReadCloser diff --git a/testing/integration/ess/otel_test.go b/testing/integration/ess/otel_test.go index 38557377f0a..aa2bc6cbef7 100644 --- a/testing/integration/ess/otel_test.go +++ b/testing/integration/ess/otel_test.go @@ -21,6 +21,8 @@ import ( "text/template" "time" + "github.com/elastic/elastic-agent/pkg/control/v2/cproto" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -326,15 +328,24 @@ service: statusCtx, statusCancel := context.WithTimeout(ctx, 5*time.Second) defer statusCancel() - output, err := fixture.ExecStatus(statusCtx) - require.NoError(t, err, "status command failed") + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + status, statusErr := fixture.ExecStatus(statusCtx) + assert.NoError(collect, statusErr) + // agent should be healthy + assert.Equal(collect, int(cproto.State_HEALTHY), status.State) + // we should have no normal components running + assert.Zero(collect, len(status.Components)) + + // we should have filebeatreceiver and metricbeatreceiver running + otelCollectorStatus := status.Collector + require.NotNil(collect, otelCollectorStatus) + assert.Equal(collect, int(cproto.CollectorComponentStatus_StatusOK), otelCollectorStatus.Status) + return + }, 1*time.Minute, 1*time.Second) cancel() fixtureWg.Wait() - require.True(t, err == nil || err == context.Canceled || err == context.DeadlineExceeded, "Retrieved unexpected error: %s", err.Error()) - - assert.NotNil(t, output.Collector) - assert.Equal(t, 2, output.Collector.Status, "collector status should have been StatusOK") } func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesting.Fixture, tempDir string) {