From 5ea89edbbba82d8ef392b2c10d75bea8a6a7e938 Mon Sep 17 00:00:00 2001 From: Jay Camp Date: Fri, 28 Aug 2020 17:56:45 -0400 Subject: [PATCH] alternative implementation --- go.mod | 1 + service/service.go | 18 ++- service/set_flag.go | 59 +++++++ service/set_flag_test.go | 249 ++++++++++++++++++++++++++++++ testbed/testbed/otelcol_runner.go | 3 +- 5 files changed, 326 insertions(+), 4 deletions(-) create mode 100644 service/set_flag.go create mode 100644 service/set_flag_test.go diff --git a/go.mod b/go.mod index 36ad210e8e0..2e25afa58d2 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.0.0 + github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.6.1 github.com/tcnksm/ghr v0.13.0 diff --git a/service/service.go b/service/service.go index 83ed4c3d0a0..f793fc3dad4 100644 --- a/service/service.go +++ b/service/service.go @@ -111,10 +111,10 @@ type Parameters struct { } // ConfigFactory creates config. -type ConfigFactory func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) +type ConfigFactory func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) // FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file. -func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { +func FileLoaderConfigFactory(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) { file := builder.GetConfigFile() if file == "" { return nil, errors.New("config file not specified") @@ -124,6 +124,17 @@ func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*co if err != nil { return nil, fmt.Errorf("error loading config file %q: %v", file, err) } + + overrides, err := getSetConfig(cmd) + if err != nil { + return nil, fmt.Errorf("failed getting set overrides: %v", err) + } + + err = v.MergeConfigMap(overrides.AllSettings()) + if err != nil { + return nil, fmt.Errorf("failed merging overrides: %v", err) + } + return config.Load(v, factories) } @@ -171,6 +182,7 @@ func New(params Parameters) (*Application, error) { addFlags(flagSet) } rootCmd.Flags().AddGoFlagSet(flagSet) + addSetFlag(rootCmd.Flags()) app.rootCmd = rootCmd @@ -275,7 +287,7 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor } app.logger.Info("Loading configuration...") - cfg, err := factory(app.v, app.factories) + cfg, err := factory(app.v, app.rootCmd, app.factories) if err != nil { return fmt.Errorf("cannot load configuration: %w", err) } diff --git a/service/set_flag.go b/service/set_flag.go new file mode 100644 index 00000000000..ddb11fcebf5 --- /dev/null +++ b/service/set_flag.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "bytes" + "fmt" + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" +) + +const ( + setFlagName = "set" +) + +func addSetFlag(flagSet *pflag.FlagSet) { + flagSet.StringArray(setFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file. The flag has a higher precedence over config file. The arrays are overridden. Example --set=processors.batch.timeout=2s") +} + +// getSetConfig gets set flags to provided cfg. +func getSetConfig(cmd *cobra.Command) (*viper.Viper, error) { + flagProperties, err := cmd.Flags().GetStringArray(setFlagName) + if err != nil { + return nil, err + } + if len(flagProperties) == 0 { + return viper.New(), nil + } + b := bytes.Buffer{} + for _, property := range flagProperties { + property = strings.TrimSpace(property) + if _, err := b.WriteString(property + "\n"); err != nil { + return nil, err + } + } + + viperConfig := viper.New() + viperConfig.SetConfigType("properties") + if err := viperConfig.ReadConfig(&b); err != nil { + return nil, fmt.Errorf("failed reading in viper config: %v", err) + } + + return viperConfig, nil +} diff --git a/service/set_flag_test.go b/service/set_flag_test.go new file mode 100644 index 00000000000..4b71ab5da11 --- /dev/null +++ b/service/set_flag_test.go @@ -0,0 +1,249 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "testing" + "time" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/exporter/kafkaexporter" + "go.opentelemetry.io/collector/processor/batchprocessor" + "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/resourceprocessor" + "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/service/defaultcomponents" +) + +func TestFlags(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + + err := cmd.ParseFlags([]string{ + "--set=processors.batch.timeout=2s", + "--set=processors.batch/foo.timeout=3s", + // no effect - the batch/bar is not defined in the config + "--set=processors.batch/bar.timeout=3s", + // attributes is a list of objects, The arrays are overridden + "--set=processors.resource.attributes.key=key2", + // TODO arrays of objects cannot be indexed + // TODO the flag sets only the first element in the array + //"--set=processors.resource.attributes[0].key=key3", + // maps of primitive types are joined + "--set=processors.resource.labels.key2=value2", + "--set=receivers.otlp.protocols.grpc.endpoint=localhost:1818", + // arrays of primitive types are overridden + "--set=exporters.kafka.brokers=foo:9200,foo2:9200", + }) + require.NoError(t, err) + + cfg := &configmodels.Config{ + Processors: map[string]configmodels.Processor{ + "batch": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch", + }, + Timeout: time.Second * 10, + SendBatchSize: 10, + }, + "batch/foo": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch/foo", + }, + SendBatchSize: 20, + }, + "resource": &resourceprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + AttributesActions: []processorhelper.ActionKeyValue{ + {Key: "key"}, + }, + Labels: map[string]string{"key": "value"}, + }, + }, + Receivers: map[string]configmodels.Receiver{ + "otlp": &otlpreceiver.Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "otlp", + NameVal: "otlp", + }, + }, + }, + Exporters: map[string]configmodels.Exporter{ + "kafka": &kafkaexporter.Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "kafka", + NameVal: "kafka", + }, + // This gets overridden + Brokers: []string{"bar:9200"}, + }, + }, + } + factories, err := defaultcomponents.Components() + require.NoError(t, err) + err = applyFlags(cmd, cfg, factories) + require.NoError(t, err) + assert.Equal(t, &configmodels.Config{ + Processors: map[string]configmodels.Processor{ + "batch": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch", + }, + Timeout: time.Second * 2, + SendBatchSize: 10, + }, + "batch/foo": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch/foo", + }, + Timeout: time.Second * 3, + SendBatchSize: 20, + }, + "resource": &resourceprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "resource", + NameVal: "resource", + }, + AttributesActions: []processorhelper.ActionKeyValue{ + //{Key: "key"}, + {Key: "key2"}, + }, + Labels: map[string]string{"key": "value", "key2": "value2"}, + }, + }, + Receivers: map[string]configmodels.Receiver{ + "otlp": &otlpreceiver.Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "otlp", + NameVal: "otlp", + }, + Protocols: otlpreceiver.Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + NetAddr: confignet.NetAddr{ + Endpoint: "localhost:1818", + }, + }, + }, + }, + }, + Exporters: map[string]configmodels.Exporter{ + "kafka": &kafkaexporter.Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "kafka", + NameVal: "kafka", + }, + Brokers: []string{"foo:9200", "foo2:9200"}, + }, + }, + }, cfg) +} + +func TestFlags_component_with_custom_marshaller_is_not_defined_via_flags(t *testing.T) { + cmd := &cobra.Command{} + addSetFlag(cmd.Flags()) + + err := cmd.ParseFlags([]string{ + "--set=processors.batch.timeout=2s", + }) + require.NoError(t, err) + + cfg := &configmodels.Config{ + Processors: map[string]configmodels.Processor{ + "batch": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch", + }, + Timeout: time.Second * 10, + }, + }, + Receivers: map[string]configmodels.Receiver{ + "otlp": &otlpreceiver.Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "otlp", + NameVal: "otlp", + }, + Protocols: otlpreceiver.Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + NetAddr: confignet.NetAddr{ + Endpoint: "localhost:8090", + }, + }, + }, + }, + }, + Exporters: map[string]configmodels.Exporter{ + "kafka": &kafkaexporter.Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "kafka", + NameVal: "kafka", + }, + Brokers: []string{"bar:9200"}, + }, + }, + } + factories, err := defaultcomponents.Components() + require.NoError(t, err) + err = applyFlags(cmd, cfg, factories) + require.NoError(t, err) + assert.Equal(t, &configmodels.Config{ + Processors: map[string]configmodels.Processor{ + "batch": &batchprocessor.Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch", + }, + Timeout: time.Second * 2, + }, + }, + Receivers: map[string]configmodels.Receiver{ + "otlp": &otlpreceiver.Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "otlp", + NameVal: "otlp", + }, + Protocols: otlpreceiver.Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + NetAddr: confignet.NetAddr{ + Endpoint: "localhost:8090", + }, + }, + }, + }, + }, + Exporters: map[string]configmodels.Exporter{ + "kafka": &kafkaexporter.Config{ + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: "kafka", + NameVal: "kafka", + }, + Brokers: []string{"bar:9200"}, + }, + }, + }, cfg) +} diff --git a/testbed/testbed/otelcol_runner.go b/testbed/testbed/otelcol_runner.go index 2ea0b19791e..c929c3315cf 100644 --- a/testbed/testbed/otelcol_runner.go +++ b/testbed/testbed/otelcol_runner.go @@ -19,6 +19,7 @@ import ( "strings" "github.com/shirou/gopsutil/process" + "github.com/spf13/cobra" "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -106,7 +107,7 @@ func (ipp *InProcessCollector) Start(args StartParams) (receiverAddr string, err Version: version.Version, GitHash: version.GitHash, }, - ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) { + ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) { return ipp.config, nil }, Factories: ipp.factories,