Skip to content

Commit

Permalink
Revert "Revert "Support set flag for component configs (open-telemetr…
Browse files Browse the repository at this point in the history
…y#1640)" (open-telemetry#1905)"

This reverts commit 2efdb28.
  • Loading branch information
joe-elliott committed Nov 30, 2020
1 parent 7c28105 commit 21713f2
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 9 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/tinylib/msgp v1.1.5
Expand Down
33 changes: 27 additions & 6 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,22 @@ type Parameters struct {
ApplicationStartInfo component.ApplicationStartInfo
// ConfigFactory that creates the configuration.
// If it is not provided the default factory (FileLoaderConfigFactory) is used.
// The default factory loads the configuration specified as a command line flag.
// The default factory loads the configuration file and overrides component's configuration
// properties supplied via --set command line flag.
ConfigFactory ConfigFactory
// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option
}

// ConfigFactory creates config.
type ConfigFactory func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error)

// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file.
func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) {
// The ConfigFactory implementation should call AddSetFlagProperties to enable configuration passed via `--set` flag.
// Viper and command instances are passed from the Application.
// The factories also belong the Application and are equal to the factories passed via Parameters.
type ConfigFactory func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error)

// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file
// and from --set command line flag (if the flag is present).
func FileLoaderConfigFactory(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) {
file := builder.GetConfigFile()
if file == "" {
return nil, errors.New("config file not specified")
Expand All @@ -124,6 +129,21 @@ func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*co
if err != nil {
return nil, fmt.Errorf("error loading config file %q: %v", file, err)
}
// The configuration is being loaded from two places: the config file and --set command line flag.
// The --set flag implementation (AddSetFlagProperties) creates a properties file from the --set flag
// that is loaded by a different viper instance.
// Viper implementation of v.MergeConfig(io.Reader) or v.MergeConfigMap(map[string]interface)
// does not work properly.
// The workaround is to call v.Set(string, interface) on all loaded properties from the config file
// and then do the same for --set flag in AddSetFlagProperties.
for _, k := range v.AllKeys() {
v.Set(k, v.Get(k))
}

// handle --set flag and override properties from the configuration file
if err := AddSetFlagProperties(v, cmd); err != nil {
return nil, fmt.Errorf("failed to process set flag: %v", err)
}
return config.Load(v, factories)
}

Expand Down Expand Up @@ -172,6 +192,7 @@ func New(params Parameters) (*Application, error) {
addFlags(flagSet)
}
rootCmd.Flags().AddGoFlagSet(flagSet)
addSetFlag(rootCmd.Flags())

app.rootCmd = rootCmd

Expand Down Expand Up @@ -276,7 +297,7 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor
}

app.logger.Info("Loading configuration...")
cfg, err := factory(app.v, app.factories)
cfg, err := factory(app.v, app.rootCmd, app.factories)
if err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
}
Expand Down
110 changes: 108 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"syscall"
"testing"
"time"

"github.com/prometheus/common/expfmt"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -38,6 +41,10 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/processor/attributesprocessor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/service/builder"
"go.opentelemetry.io/collector/service/defaultcomponents"
"go.opentelemetry.io/collector/testutil"
)
Expand Down Expand Up @@ -137,7 +144,7 @@ func TestApplication_StartAsGoRoutine(t *testing.T) {

params := Parameters{
ApplicationStartInfo: componenttest.TestApplicationStartInfo(),
ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) {
ConfigFactory: func(v *viper.Viper, command *cobra.Command, factories component.Factories) (*configmodels.Config, error) {
return constructMimumalOpConfig(t, factories), nil
},
Factories: factories,
Expand Down Expand Up @@ -412,7 +419,7 @@ func createExampleApplication(t *testing.T) *Application {

app, err := New(Parameters{
Factories: factories,
ConfigFactory: func(v *viper.Viper, factories component.Factories) (c *configmodels.Config, err error) {
ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (c *configmodels.Config, err error) {
config := &configmodels.Config{
Receivers: map[string]configmodels.Receiver{
string(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(),
Expand Down Expand Up @@ -511,6 +518,105 @@ func TestApplication_GetExporters(t *testing.T) {
<-appDone
}

func TestSetFlag(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)
params := Parameters{
Factories: factories,
}
t.Run("unknown_component", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)
err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.doesnotexist.timeout=2s",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories)
require.Error(t, err)
require.Nil(t, cfg)

})
t.Run("component_not_added_to_pipeline", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)
err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch/foo.timeout=2s",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories)
require.NoError(t, err)
assert.NotNil(t, cfg)
err = config.ValidateConfig(cfg, zap.NewNop())
require.NoError(t, err)

var processors []string
for k := range cfg.Processors {
processors = append(processors, k)
}
sort.Strings(processors)
// batch/foo is not added to the pipeline
assert.Equal(t, []string{"attributes", "batch", "batch/foo", "queued_retry"}, processors)
assert.Equal(t, []string{"attributes", "batch", "queued_retry"}, cfg.Service.Pipelines["traces"].Processors)
})
t.Run("ok", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)

err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch.timeout=2s",
// Arrays are overridden and object arrays cannot be indexed
// this creates actions array of size 1
"--set=processors.attributes.actions.key=foo",
"--set=processors.attributes.actions.value=bar",
"--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345",
"--set=extensions.health_check.port=8080",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.v, app.rootCmd, factories)
require.NoError(t, err)
require.NotNil(t, cfg)
err = config.ValidateConfig(cfg, zap.NewNop())
require.NoError(t, err)

assert.Equal(t, 3, len(cfg.Processors))
batch := cfg.Processors["batch"].(*batchprocessor.Config)
assert.Equal(t, time.Second*2, batch.Timeout)
jaeger := cfg.Receivers["jaeger"].(*jaegerreceiver.Config)
assert.Equal(t, "localhost:12345", jaeger.GRPC.NetAddr.Endpoint)
attributes := cfg.Processors["attributes"].(*attributesprocessor.Config)
require.Equal(t, 1, len(attributes.Actions))
assert.Equal(t, "foo", attributes.Actions[0].Key)
assert.Equal(t, "bar", attributes.Actions[0].Value)
})
}

func TestSetFlag_component_does_not_exist(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)

v := config.NewViper()
cmd := &cobra.Command{}
addSetFlag(cmd.Flags())
fs := &flag.FlagSet{}
builder.Flags(fs)
cmd.Flags().AddGoFlagSet(fs)
cmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch.timeout=2s",
// Arrays are overridden and object arrays cannot be indexed
// this creates actions array of size 1
"--set=processors.attributes.actions.key=foo",
"--set=processors.attributes.actions.value=bar",
"--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345",
})
cfg, err := FileLoaderConfigFactory(v, cmd, factories)
require.NoError(t, err)
require.NotNil(t, cfg)
}

func constructMimumalOpConfig(t *testing.T, factories component.Factories) *configmodels.Config {
configStr := `
receivers:
Expand Down
68 changes: 68 additions & 0 deletions service/set_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"bytes"
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"go.opentelemetry.io/collector/config"
)

const (
setFlagName = "set"
setFlagFileType = "properties"
)

func addSetFlag(flagSet *pflag.FlagSet) {
flagSet.StringArray(setFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file and the flag has a higher precedence. Array config properties are overridden and maps are joined, note that only a single (first) array property can be set e.g. -set=processors.attributes.actions.key=some_key. Example --set=processors.batch.timeout=2s")
}

// AddSetFlagProperties overrides properties from set flag(s) in supplied viper instance.
// The implementation reads set flag(s) from the cmd and passes the content to a new viper instance as .properties file.
// Then the properties from new viper instance are read and set to the supplied viper.
func AddSetFlagProperties(v *viper.Viper, cmd *cobra.Command) error {
flagProperties, err := cmd.Flags().GetStringArray(setFlagName)
if err != nil {
return err
}
if len(flagProperties) == 0 {
return nil
}
b := &bytes.Buffer{}
for _, property := range flagProperties {
property = strings.TrimSpace(property)
if _, err := fmt.Fprintf(b, "%s\n", property); err != nil {
return err
}
}
viperFlags := config.NewViper()
viperFlags.SetConfigType(setFlagFileType)
if err := viperFlags.ReadConfig(b); err != nil {
return fmt.Errorf("failed to read set flag config: %v", err)
}

// flagProperties cannot be applied to v directly because
// v.MergeConfig(io.Reader) or v.MergeConfigMap(map[string]interface) does not work properly.
for _, k := range viperFlags.AllKeys() {
v.Set(k, viperFlags.Get(k))
}
return nil
}
64 changes: 64 additions & 0 deletions service/set_flag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"testing"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSetFlags(t *testing.T) {
cmd := &cobra.Command{}
addSetFlag(cmd.Flags())

err := cmd.ParseFlags([]string{
"--set=processors.batch.timeout=2s",
"--set=processors.batch/foo.timeout=3s",
"--set=receivers.otlp.protocols.grpc.endpoint=localhost:1818",
"--set=exporters.kafka.brokers=foo:9200,foo2:9200",
})
require.NoError(t, err)

v := viper.New()
err = AddSetFlagProperties(v, cmd)
require.NoError(t, err)

settings := v.AllSettings()
assert.Equal(t, 4, len(settings))
assert.Equal(t, "2s", v.Get("processors::batch::timeout"))
assert.Equal(t, "3s", v.Get("processors::batch/foo::timeout"))
assert.Equal(t, "foo:9200,foo2:9200", v.Get("exporters::kafka::brokers"))
assert.Equal(t, "localhost:1818", v.Get("receivers::otlp::protocols::grpc::endpoint"))
}

func TestSetFlags_err_set_flag(t *testing.T) {
cmd := &cobra.Command{}
v := viper.New()
err := AddSetFlagProperties(v, cmd)
require.Error(t, err)
}

func TestSetFlags_empty(t *testing.T) {
cmd := &cobra.Command{}
addSetFlag(cmd.Flags())
v := viper.New()
err := AddSetFlagProperties(v, cmd)
require.NoError(t, err)
assert.Equal(t, 0, len(v.AllSettings()))
}
3 changes: 2 additions & 1 deletion testbed/testbed/otelcol_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/shirou/gopsutil/process"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -104,7 +105,7 @@ func (ipp *InProcessCollector) Start(args StartParams) error {
Version: version.Version,
GitHash: version.GitHash,
},
ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) {
ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) {
return ipp.config, nil
},
Factories: ipp.factories,
Expand Down

0 comments on commit 21713f2

Please sign in to comment.