Skip to content

Commit

Permalink
alternative implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jrcamp committed Aug 28, 2020
1 parent 03faf67 commit 5ea89ed
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 4 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/tcnksm/ghr v0.13.0
Expand Down
18 changes: 15 additions & 3 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ type Parameters struct {
}

// ConfigFactory creates config.
type ConfigFactory func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error)
type ConfigFactory func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error)

// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file.
func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) {
func FileLoaderConfigFactory(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) {
file := builder.GetConfigFile()
if file == "" {
return nil, errors.New("config file not specified")
Expand All @@ -124,6 +124,17 @@ func FileLoaderConfigFactory(v *viper.Viper, factories component.Factories) (*co
if err != nil {
return nil, fmt.Errorf("error loading config file %q: %v", file, err)
}

overrides, err := getSetConfig(cmd)
if err != nil {
return nil, fmt.Errorf("failed getting set overrides: %v", err)
}

err = v.MergeConfigMap(overrides.AllSettings())
if err != nil {
return nil, fmt.Errorf("failed merging overrides: %v", err)
}

return config.Load(v, factories)
}

Expand Down Expand Up @@ -171,6 +182,7 @@ func New(params Parameters) (*Application, error) {
addFlags(flagSet)
}
rootCmd.Flags().AddGoFlagSet(flagSet)
addSetFlag(rootCmd.Flags())

app.rootCmd = rootCmd

Expand Down Expand Up @@ -275,7 +287,7 @@ func (app *Application) setupConfigurationComponents(ctx context.Context, factor
}

app.logger.Info("Loading configuration...")
cfg, err := factory(app.v, app.factories)
cfg, err := factory(app.v, app.rootCmd, app.factories)
if err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
}
Expand Down
59 changes: 59 additions & 0 deletions service/set_flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"bytes"
"fmt"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)

const (
setFlagName = "set"
)

func addSetFlag(flagSet *pflag.FlagSet) {
flagSet.StringArray(setFlagName, []string{}, "Set arbitrary component config property. The component has to be defined in the config file. The flag has a higher precedence over config file. The arrays are overridden. Example --set=processors.batch.timeout=2s")
}

// getSetConfig gets set flags to provided cfg.
func getSetConfig(cmd *cobra.Command) (*viper.Viper, error) {
flagProperties, err := cmd.Flags().GetStringArray(setFlagName)
if err != nil {
return nil, err
}
if len(flagProperties) == 0 {
return viper.New(), nil
}
b := bytes.Buffer{}
for _, property := range flagProperties {
property = strings.TrimSpace(property)
if _, err := b.WriteString(property + "\n"); err != nil {
return nil, err
}
}

viperConfig := viper.New()
viperConfig.SetConfigType("properties")
if err := viperConfig.ReadConfig(&b); err != nil {
return nil, fmt.Errorf("failed reading in viper config: %v", err)
}

return viperConfig, nil
}
249 changes: 249 additions & 0 deletions service/set_flag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service

import (
"testing"
"time"

"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/resourceprocessor"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/service/defaultcomponents"
)

func TestFlags(t *testing.T) {
cmd := &cobra.Command{}
addSetFlag(cmd.Flags())

err := cmd.ParseFlags([]string{
"--set=processors.batch.timeout=2s",
"--set=processors.batch/foo.timeout=3s",
// no effect - the batch/bar is not defined in the config
"--set=processors.batch/bar.timeout=3s",
// attributes is a list of objects, The arrays are overridden
"--set=processors.resource.attributes.key=key2",
// TODO arrays of objects cannot be indexed
// TODO the flag sets only the first element in the array
//"--set=processors.resource.attributes[0].key=key3",
// maps of primitive types are joined
"--set=processors.resource.labels.key2=value2",
"--set=receivers.otlp.protocols.grpc.endpoint=localhost:1818",
// arrays of primitive types are overridden
"--set=exporters.kafka.brokers=foo:9200,foo2:9200",
})
require.NoError(t, err)

cfg := &configmodels.Config{
Processors: map[string]configmodels.Processor{
"batch": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch",
},
Timeout: time.Second * 10,
SendBatchSize: 10,
},
"batch/foo": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch/foo",
},
SendBatchSize: 20,
},
"resource": &resourceprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []processorhelper.ActionKeyValue{
{Key: "key"},
},
Labels: map[string]string{"key": "value"},
},
},
Receivers: map[string]configmodels.Receiver{
"otlp": &otlpreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: "otlp",
NameVal: "otlp",
},
},
},
Exporters: map[string]configmodels.Exporter{
"kafka": &kafkaexporter.Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: "kafka",
NameVal: "kafka",
},
// This gets overridden
Brokers: []string{"bar:9200"},
},
},
}
factories, err := defaultcomponents.Components()
require.NoError(t, err)
err = applyFlags(cmd, cfg, factories)
require.NoError(t, err)
assert.Equal(t, &configmodels.Config{
Processors: map[string]configmodels.Processor{
"batch": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch",
},
Timeout: time.Second * 2,
SendBatchSize: 10,
},
"batch/foo": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch/foo",
},
Timeout: time.Second * 3,
SendBatchSize: 20,
},
"resource": &resourceprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "resource",
NameVal: "resource",
},
AttributesActions: []processorhelper.ActionKeyValue{
//{Key: "key"},
{Key: "key2"},
},
Labels: map[string]string{"key": "value", "key2": "value2"},
},
},
Receivers: map[string]configmodels.Receiver{
"otlp": &otlpreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: "otlp",
NameVal: "otlp",
},
Protocols: otlpreceiver.Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:1818",
},
},
},
},
},
Exporters: map[string]configmodels.Exporter{
"kafka": &kafkaexporter.Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: "kafka",
NameVal: "kafka",
},
Brokers: []string{"foo:9200", "foo2:9200"},
},
},
}, cfg)
}

func TestFlags_component_with_custom_marshaller_is_not_defined_via_flags(t *testing.T) {
cmd := &cobra.Command{}
addSetFlag(cmd.Flags())

err := cmd.ParseFlags([]string{
"--set=processors.batch.timeout=2s",
})
require.NoError(t, err)

cfg := &configmodels.Config{
Processors: map[string]configmodels.Processor{
"batch": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch",
},
Timeout: time.Second * 10,
},
},
Receivers: map[string]configmodels.Receiver{
"otlp": &otlpreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: "otlp",
NameVal: "otlp",
},
Protocols: otlpreceiver.Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:8090",
},
},
},
},
},
Exporters: map[string]configmodels.Exporter{
"kafka": &kafkaexporter.Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: "kafka",
NameVal: "kafka",
},
Brokers: []string{"bar:9200"},
},
},
}
factories, err := defaultcomponents.Components()
require.NoError(t, err)
err = applyFlags(cmd, cfg, factories)
require.NoError(t, err)
assert.Equal(t, &configmodels.Config{
Processors: map[string]configmodels.Processor{
"batch": &batchprocessor.Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch",
},
Timeout: time.Second * 2,
},
},
Receivers: map[string]configmodels.Receiver{
"otlp": &otlpreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: "otlp",
NameVal: "otlp",
},
Protocols: otlpreceiver.Protocols{
GRPC: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:8090",
},
},
},
},
},
Exporters: map[string]configmodels.Exporter{
"kafka": &kafkaexporter.Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: "kafka",
NameVal: "kafka",
},
Brokers: []string{"bar:9200"},
},
},
}, cfg)
}
3 changes: 2 additions & 1 deletion testbed/testbed/otelcol_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

"github.com/shirou/gopsutil/process"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (ipp *InProcessCollector) Start(args StartParams) (receiverAddr string, err
Version: version.Version,
GitHash: version.GitHash,
},
ConfigFactory: func(v *viper.Viper, factories component.Factories) (*configmodels.Config, error) {
ConfigFactory: func(v *viper.Viper, cmd *cobra.Command, factories component.Factories) (*configmodels.Config, error) {
return ipp.config, nil
},
Factories: ipp.factories,
Expand Down

0 comments on commit 5ea89ed

Please sign in to comment.