diff --git a/go.mod b/go.mod index ea0b67e3246..4d1120f925f 100644 --- a/go.mod +++ b/go.mod @@ -50,6 +50,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.1.1 + github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tcnksm/ghr v0.13.0 diff --git a/service/service.go b/service/service.go index b1c40718e60..e2675bcc2bb 100644 --- a/service/service.go +++ b/service/service.go @@ -103,17 +103,22 @@ type Parameters struct { ApplicationStartInfo component.ApplicationStartInfo // ConfigFactory that creates the configuration. // If it is not provided the default factory (FileLoaderConfigFactory) is used. - // The default factory loads the configuration specified as a command line flag. + // The default factory loads the configuration file and overrides component's configuration + // properties supplied via --set command line flag. ConfigFactory ConfigFactory // LoggingOptions provides a way to change behavior of zap logging. LoggingOptions []zap.Option } // ConfigFactory creates config. -type ConfigFactory func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) - -// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file. -func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { +// The ConfigFactory implementation should call AddSetFlagProperties to enable configuration passed via `--set` flag. +// Viper and command instances are passed from the Application. +// The factories also belong the Application and are equal to the factories passed via Parameters. +type ConfigFactory func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) + +// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file +// and from --set command line flag (if the flag is present). +func FileLoaderConfigFactory(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) { file := builder.GetConfigFile() if file == "" { return nil, errors.New("config file not specified") @@ -123,6 +128,21 @@ func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*co if err != nil { return nil, fmt.Errorf("error loading config file %q: %v", file, err) } + // The configuration is being loaded from two places: the config file and --set command line flag. + // The --set flag implementation (AddSetFlagProperties) creates a properties file from the --set flag + // that is loaded by a different viper instance. + // Viper implementation of v.MergeConfig(io.Reader) or v.MergeConfigMap(map[string]interface) + // does not work properly. + // The workaround is to call v.Set(string, interface) on all loaded properties from the config file + // and then do the same for --set flag in AddSetFlagProperties. + for _, k := range v.AllKeys() { + v.Set(k, v.Get(k)) + } + + // handle --set flag and override properties from the configuration file + if err := AddSetFlagProperties(v, cmd); err != nil { + return nil, fmt.Errorf("failed to process set flag: %v", err) + } return config.Load(v, factories) } @@ -170,6 +190,7 @@ func New(params Parameters) (*Application, error) { addFlags(flagSet) } rootCmd.Flags().AddGoFlagSet(flagSet) + addSetFlag(rootCmd.Flags()) app.rootCmd = rootCmd @@ -274,7 +295,7 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor } app.logger.Info("Loading configuration...") - cfg, err := factory(app.v, app.factories) + cfg, err := factory(app.v, app.rootCmd, app.factories) if err != nil { return fmt.Errorf("cannot load configuration: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index bee323bd618..40a52146cc9 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -19,6 +19,7 @@ import ( "bufio" "context" "errors" + "flag" "fmt" "net/http" "sort" @@ -26,8 +27,10 @@ import ( "strings" "syscall" "testing" + "time" "github.com/prometheus/common/expfmt" + "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,6 +41,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/processor/attributesprocessor" + "go.opentelemetry.io/collector/processor/batchprocessor" + "go.opentelemetry.io/collector/receiver/jaegerreceiver" + "go.opentelemetry.io/collector/service/builder" "go.opentelemetry.io/collector/service/defaultcomponents" "go.opentelemetry.io/collector/testutil" ) @@ -137,7 +144,7 @@ func TestApplication_StartAsGoRoutine(t *testing.T) { params := Parameters{ ApplicationStartInfo: componenttest.TestApplicationStartInfo(), - ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { + ConfigFactory: func(v *viper.Viper, command *cobra.Command, factories component.Factories) (*configmodels.Config, error) { return constructMimumalOpConfig(t, factories), nil }, Factories: factories, @@ -412,7 +419,7 @@ func createExampleApplication(t *testing.T) *Application { app, err := New(Parameters{ Factories: factories, - ConfigFactory: func(v *viper.Viper, factories component.Factories) (c *configmodels.Config, err error) { + ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (c *configmodels.Config, err error) { config := &configmodels.Config{ Receivers: map[string]configmodels.Receiver{ string(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), @@ -511,6 +518,105 @@ func TestApplication_GetExporters(t *testing.T) { <-appDone } +func TestSetFlag(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + params := Parameters{ + Factories: factories, + } + t.Run("unknown_component", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.doesnotexist.timeout=2s", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.Error(t, err) + require.Nil(t, cfg) + + }) + t.Run("component_not_added_to_pipeline", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch/foo.timeout=2s", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.NoError(t, err) + assert.NotNil(t, cfg) + err = config.ValidateConfig(cfg, zap.NewNop()) + require.NoError(t, err) + + var processors []string + for k := range cfg.Processors { + processors = append(processors, k) + } + sort.Strings(processors) + // batch/foo is not added to the pipeline + assert.Equal(t, []string{"attributes", "batch", "batch/foo", "queued_retry"}, processors) + assert.Equal(t, []string{"attributes", "batch", "queued_retry"}, cfg.Service.Pipelines["traces"].Processors) + }) + t.Run("ok", func(t *testing.T) { + app, err := New(params) + require.NoError(t, err) + + err = app.rootCmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch.timeout=2s", + // Arrays are overridden and object arrays cannot be indexed + // this creates actions array of size 1 + "--set=processors.attributes.actions.key=foo", + "--set=processors.attributes.actions.value=bar", + "--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345", + "--set=extensions.health_check.port=8080", + }) + require.NoError(t, err) + cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories) + require.NoError(t, err) + require.NotNil(t, cfg) + err = config.ValidateConfig(cfg, zap.NewNop()) + require.NoError(t, err) + + assert.Equal(t, 3, len(cfg.Processors)) + batch := cfg.Processors["batch"].(*batchprocessor.Config) + assert.Equal(t, time.Second*2, batch.Timeout) + jaeger := cfg.Receivers["jaeger"].(*jaegerreceiver.Config) + assert.Equal(t, "localhost:12345", jaeger.GRPC.NetAddr.Endpoint) + attributes := cfg.Processors["attributes"].(*attributesprocessor.Config) + require.Equal(t, 1, len(attributes.Actions)) + assert.Equal(t, "foo", attributes.Actions[0].Key) + assert.Equal(t, "bar", attributes.Actions[0].Value) + }) +} + +func TestSetFlag_component_does_not_exist(t *testing.T) { + factories, err := defaultcomponents.Components() + require.NoError(t, err) + + v := config.NewViper() + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + fs := &flag.FlagSet{} + builder.Flags(fs) + cmd.Flags().AddGoFlagSet(fs) + cmd.ParseFlags([]string{ + "--config=testdata/otelcol-config.yaml", + "--set=processors.batch.timeout=2s", + // Arrays are overridden and object arrays cannot be indexed + // this creates actions array of size 1 + "--set=processors.attributes.actions.key=foo", + "--set=processors.attributes.actions.value=bar", + "--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345", + }) + cfg, err := FileLoaderConfigFactory(v, cmd, factories) + require.NoError(t, err) + require.NotNil(t, cfg) +} + func constructMimumalOpConfig(t *testing.T, factories component.Factories) *configmodels.Config { configStr := ` receivers: diff --git a/service/set_flag.go b/service/set_flag.go new file mode 100644 index 00000000000..b28f74ba762 --- /dev/null +++ b/service/set_flag.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + "fmt" + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + + "go.opentelemetry.io/collector/config" +) + +const ( + setFlagName = "set" + setFlagFileType = "properties" +) + +func addSetFlag(flagSet *pflag.FlagSet) { + flagSet.StringArray(setFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file and the flag has a higher precedence. Array config properties are overridden and maps are joined, note that only a single (first) array property can be set e.g. -set=processors.attributes.actions.key=some_key. Example --set=processors.batch.timeout=2s") +} + +// AddSetFlagProperties overrides properties from set flag(s) in supplied viper instance. +// The implementation reads set flag(s) from the cmd and passes the content to a new viper instance as .properties file. +// Then the properties from new viper instance are read and set to the supplied viper. +func AddSetFlagProperties(v *viper.Viper, cmd *cobra.Command) error { + flagProperties, err := cmd.Flags().GetStringArray(setFlagName) + if err != nil { + return err + } + if len(flagProperties) == 0 { + return nil + } + b := &bytes.Buffer{} + for _, property := range flagProperties { + property = strings.TrimSpace(property) + if _, err := fmt.Fprintf(b, "%s\n", property); err != nil { + return err + } + } + viperFlags := config.NewViper() + viperFlags.SetConfigType(setFlagFileType) + if err := viperFlags.ReadConfig(b); err != nil { + return fmt.Errorf("failed to read set flag config: %v", err) + } + + // flagProperties cannot be applied to v directly because + // v.MergeConfig(io.Reader) or v.MergeConfigMap(map[string]interface) does not work properly. + for _, k := range viperFlags.AllKeys() { + v.Set(k, viperFlags.Get(k)) + } + return nil +} diff --git a/service/set_flag_test.go b/service/set_flag_test.go new file mode 100644 index 00000000000..f6f0c63ffcc --- /dev/null +++ b/service/set_flag_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSetFlags(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + + err := cmd.ParseFlags([]string{ + "--set=processors.batch.timeout=2s", + "--set=processors.batch/foo.timeout=3s", + "--set=receivers.otlp.protocols.grpc.endpoint=localhost:1818", + "--set=exporters.kafka.brokers=foo:9200,foo2:9200", + }) + require.NoError(t, err) + + v := viper.New() + err = AddSetFlagProperties(v, cmd) + require.NoError(t, err) + + settings := v.AllSettings() + assert.Equal(t, 4, len(settings)) + assert.Equal(t, "2s", v.Get("processors::batch::timeout")) + assert.Equal(t, "3s", v.Get("processors::batch/foo::timeout")) + assert.Equal(t, "foo:9200,foo2:9200", v.Get("exporters::kafka::brokers")) + assert.Equal(t, "localhost:1818", v.Get("receivers::otlp::protocols::grpc::endpoint")) +} + +func TestSetFlags_err_set_flag(t *testing.T) { + cmd := &cobra.Command{} + v := viper.New() + err := AddSetFlagProperties(v, cmd) + require.Error(t, err) +} + +func TestSetFlags_empty(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + v := viper.New() + err := AddSetFlagProperties(v, cmd) + require.NoError(t, err) + assert.Equal(t, 0, len(v.AllSettings())) +} diff --git a/testbed/testbed/otelcol_runner.go b/testbed/testbed/otelcol_runner.go index 7ba6966beac..72a79f80231 100644 --- a/testbed/testbed/otelcol_runner.go +++ b/testbed/testbed/otelcol_runner.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/shirou/gopsutil/process" + "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -104,7 +105,7 @@ func (ipp *InProcessCollector) Start(args StartParams) error { Version: version.Version, GitHash: version.GitHash, }, - ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { + ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) { return ipp.config, nil }, Factories: ipp.factories,