From f682219f06254c723c213811d14a5cf51f1084f5 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 25 Jun 2019 09:14:10 -0400 Subject: [PATCH] Add factory and new-style config for Batch processor This is part of remaining migration to new configuration format. - Added zap.Logger to ProcessorFactory functions because it is needed by this processor (and can be used if needed in other processors in the future). - Added Name/SetName to Processor interface because it is needed by this processor (this now also makes Processor uniform with Receiver and Exporter interfaces). Github issue: https://github.com/open-telemetry/opentelemetry-service/issues/52 Testing done: make --- .../app/builder/pipelines_builder.go | 4 +- .../collector/testdata/otelsvc-config.yaml | 4 +- .../collector/processor/nodebatcher/config.go | 45 ++++++++ .../processor/nodebatcher/config_test.go | 63 +++++++++++ .../processor/nodebatcher/factory.go | 105 ++++++++++++++++++ .../processor/nodebatcher/factory_test.go | 49 ++++++++ .../nodebatcher/testdata/config.yaml | 20 ++++ internal/configmodels/configmodels.go | 12 ++ internal/configv2/configv2.go | 1 + internal/configv2/configv2_test.go | 1 + internal/configv2/example_factories.go | 3 + internal/factories/factories.go | 5 +- internal/factories/factories_test.go | 8 +- .../addattributesprocessor/config_test.go | 1 + processor/addattributesprocessor/factory.go | 11 +- .../addattributesprocessor/factory_test.go | 6 +- 16 files changed, 325 insertions(+), 13 deletions(-) create mode 100644 internal/collector/processor/nodebatcher/config.go create mode 100644 internal/collector/processor/nodebatcher/config_test.go create mode 100644 internal/collector/processor/nodebatcher/factory.go create mode 100644 internal/collector/processor/nodebatcher/factory_test.go create mode 100644 internal/collector/processor/nodebatcher/testdata/config.yaml diff --git a/cmd/occollector/app/builder/pipelines_builder.go b/cmd/occollector/app/builder/pipelines_builder.go index 45ee803f261..dfe36834818 100644 --- a/cmd/occollector/app/builder/pipelines_builder.go +++ b/cmd/occollector/app/builder/pipelines_builder.go @@ -104,9 +104,9 @@ func (pb *PipelinesBuilder) buildPipeline( var err error switch pipelineCfg.InputType { case configmodels.TracesDataType: - tc, err = factory.CreateTraceProcessor(tc, procCfg) + tc, err = factory.CreateTraceProcessor(pb.logger, tc, procCfg) case configmodels.MetricsDataType: - mc, err = factory.CreateMetricsProcessor(mc, procCfg) + mc, err = factory.CreateMetricsProcessor(pb.logger, mc, procCfg) } if err != nil { diff --git a/cmd/occollector/app/collector/testdata/otelsvc-config.yaml b/cmd/occollector/app/collector/testdata/otelsvc-config.yaml index a28a771ef81..963a848e6d7 100644 --- a/cmd/occollector/app/collector/testdata/otelsvc-config.yaml +++ b/cmd/occollector/app/collector/testdata/otelsvc-config.yaml @@ -10,9 +10,11 @@ exporters: processors: attributes: enabled: true + batch: + enabled: true pipelines: traces: receivers: [jaeger] - processors: [attributes] + processors: [attributes, batch] exporters: [opencensus] diff --git a/internal/collector/processor/nodebatcher/config.go b/internal/collector/processor/nodebatcher/config.go new file mode 100644 index 00000000000..f46b90a499f --- /dev/null +++ b/internal/collector/processor/nodebatcher/config.go @@ -0,0 +1,45 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebatcher + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-service/internal/configmodels" +) + +// ConfigV2 defines configuration for batch processor. +type ConfigV2 struct { + configmodels.ProcessorSettings `mapstructure:",squash"` + + // Timeout sets the time after which a batch will be sent regardless of size + Timeout *time.Duration `mapstructure:"timeout,omitempty"` + + // SendBatchSize is the size of a batch which after hit, will trigger it to be sent. + SendBatchSize *int `mapstructure:"send-batch-size,omitempty"` + + // NumTickers sets the number of tickers to use to divide the work of looping + // over batch buckets. This is an advanced configuration option. + NumTickers int `mapstructure:"num-tickers,omitempty"` + + // TickTime sets time interval at which the tickers tick. This is an advanced + // configuration option. + TickTime *time.Duration `mapstructure:"tick-time,omitempty"` + + // RemoveAfterTicks is the number of ticks that must pass without a span arriving + // from a node after which the batcher for that node will be deleted. This is an + // advanved configuration option. + RemoveAfterTicks *int `mapstructure:"remove-after-ticks,omitempty"` +} diff --git a/internal/collector/processor/nodebatcher/config_test.go b/internal/collector/processor/nodebatcher/config_test.go new file mode 100644 index 00000000000..7f84c7772f0 --- /dev/null +++ b/internal/collector/processor/nodebatcher/config_test.go @@ -0,0 +1,63 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebatcher + +import ( + "path" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-service/internal/configmodels" + "github.com/open-telemetry/opentelemetry-service/internal/configv2" + "github.com/open-telemetry/opentelemetry-service/internal/factories" +) + +var _ = configv2.RegisterTestFactories() + +func TestLoadConfig(t *testing.T) { + + factory := factories.GetProcessorFactory(typeStr) + + config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml")) + + require.Nil(t, err) + require.NotNil(t, config) + + p0 := config.Processors["batch"] + assert.Equal(t, p0, factory.CreateDefaultConfig()) + + p1 := config.Processors["batch/2"] + + timeout := time.Second * 10 + tickTime := time.Second * 5 + removeAfterTicks := 20 + sendBatchSize := 1000 + + assert.Equal(t, p1, + &ConfigV2{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "batch", + NameVal: "batch/2", + }, + Timeout: &timeout, + NumTickers: 10, + RemoveAfterTicks: &removeAfterTicks, + SendBatchSize: &sendBatchSize, + TickTime: &tickTime, + }) +} diff --git a/internal/collector/processor/nodebatcher/factory.go b/internal/collector/processor/nodebatcher/factory.go new file mode 100644 index 00000000000..1c35f6dcecd --- /dev/null +++ b/internal/collector/processor/nodebatcher/factory.go @@ -0,0 +1,105 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebatcher + +import ( + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-service/consumer" + "github.com/open-telemetry/opentelemetry-service/internal/configmodels" + "github.com/open-telemetry/opentelemetry-service/internal/factories" + "github.com/open-telemetry/opentelemetry-service/processor" +) + +var _ = factories.RegisterProcessorFactory(&processorFactory{}) + +const ( + // The value of "type" key in configuration. + typeStr = "batch" +) + +// processorFactory is the factory for batch processor. +type processorFactory struct { +} + +// Type gets the type of the config created by this factory. +func (f *processorFactory) Type() string { + return typeStr +} + +// CreateDefaultConfig creates the default configuration for processor. +func (f *processorFactory) CreateDefaultConfig() configmodels.Processor { + removeAfterTicks := int(defaultRemoveAfterCycles) + sendBatchSize := int(defaultSendBatchSize) + tickTime := defaultTickTime + timeout := defaultTimeout + + return &ConfigV2{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + RemoveAfterTicks: &removeAfterTicks, + SendBatchSize: &sendBatchSize, + NumTickers: defaultNumTickers, + TickTime: &tickTime, + Timeout: &timeout, + } +} + +// CreateTraceProcessor creates a trace processor based on this config. +func (f *processorFactory) CreateTraceProcessor( + logger *zap.Logger, + nextConsumer consumer.TraceConsumer, + c configmodels.Processor, +) (processor.TraceProcessor, error) { + cfg := c.(*ConfigV2) + + var batchingOptions []Option + if cfg.Timeout != nil { + batchingOptions = append(batchingOptions, WithTimeout(*cfg.Timeout)) + } + if cfg.NumTickers > 0 { + batchingOptions = append( + batchingOptions, WithNumTickers(cfg.NumTickers), + ) + } + if cfg.TickTime != nil { + batchingOptions = append( + batchingOptions, WithTickTime(*cfg.TickTime), + ) + } + if cfg.SendBatchSize != nil { + batchingOptions = append( + batchingOptions, WithSendBatchSize(*cfg.SendBatchSize), + ) + } + if cfg.RemoveAfterTicks != nil { + batchingOptions = append( + batchingOptions, WithRemoveAfterTicks(*cfg.RemoveAfterTicks), + ) + } + + return NewBatcher(cfg.NameVal, logger, nextConsumer, batchingOptions...), nil +} + +// CreateMetricsProcessor creates a metrics processor based on this config. +func (f *processorFactory) CreateMetricsProcessor( + logger *zap.Logger, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.MetricsProcessor, error) { + return nil, factories.ErrDataTypeIsNotSupported +} diff --git a/internal/collector/processor/nodebatcher/factory_test.go b/internal/collector/processor/nodebatcher/factory_test.go new file mode 100644 index 00000000000..6ecffefbb3c --- /dev/null +++ b/internal/collector/processor/nodebatcher/factory_test.go @@ -0,0 +1,49 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebatcher + +import ( + "testing" + + "go.uber.org/zap" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-service/internal/factories" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := factories.GetProcessorFactory(typeStr) + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") +} + +func TestCreateProcessor(t *testing.T) { + factory := factories.GetProcessorFactory(typeStr) + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + + tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg) + assert.NotNil(t, tp) + assert.NoError(t, err, "cannot create trace processor") + + mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) + assert.Nil(t, mp) + assert.Error(t, err, "should not be able to create metric processor") +} diff --git a/internal/collector/processor/nodebatcher/testdata/config.yaml b/internal/collector/processor/nodebatcher/testdata/config.yaml new file mode 100644 index 00000000000..b5dac8ad24e --- /dev/null +++ b/internal/collector/processor/nodebatcher/testdata/config.yaml @@ -0,0 +1,20 @@ +receivers: + examplereceiver: + +processors: + batch: + batch/2: + timeout: 10s + send-batch-size: 1000 + num-tickers: 10 + tick-time: 5s + remove-after-ticks: 20 + +exporters: + exampleexporter: + +pipelines: + traces: + receivers: [examplereceiver] + processors: [batch/2] + exporters: [exampleexporter] diff --git a/internal/configmodels/configmodels.go b/internal/configmodels/configmodels.go index 67c32d22c8f..4685ccff48c 100644 --- a/internal/configmodels/configmodels.go +++ b/internal/configmodels/configmodels.go @@ -69,6 +69,7 @@ type Exporters map[string]Exporter // Processor is the configuration of a processor. Specific processors must implement this // interface and will typically embed ProcessorSettings struct or a struct that extends it. type Processor interface { + NamedEntity Type() string SetType(typeStr string) } @@ -188,9 +189,20 @@ func (es *ExporterSettings) SetType(typeStr string) { // Specific processors can embed this struct and extend it with more fields if needed. type ProcessorSettings struct { TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` Enabled bool `mapstructure:"enabled"` } +// Name gets the processor name. +func (proc *ProcessorSettings) Name() string { + return proc.NameVal +} + +// SetName sets the processor name. +func (proc *ProcessorSettings) SetName(name string) { + proc.NameVal = name +} + // Type sets the processor type. func (proc *ProcessorSettings) Type() string { return proc.TypeVal diff --git a/internal/configv2/configv2.go b/internal/configv2/configv2.go index 9680bb7fb0a..39796b30a85 100644 --- a/internal/configv2/configv2.go +++ b/internal/configv2/configv2.go @@ -315,6 +315,7 @@ func loadProcessors(v *viper.Viper) (configmodels.Processors, error) { // Create the default config for this processors processorCfg := factory.CreateDefaultConfig() processorCfg.SetType(typeStr) + processorCfg.SetName(fullName) // Now that the default config struct is created we can Unmarshal into it // and it will apply user-defined config on top of the default. diff --git a/internal/configv2/configv2_test.go b/internal/configv2/configv2_test.go index e2e6b3cac39..c1104809454 100644 --- a/internal/configv2/configv2_test.go +++ b/internal/configv2/configv2_test.go @@ -88,6 +88,7 @@ func TestDecodeConfig(t *testing.T) { &ExampleProcessor{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "exampleprocessor", + NameVal: "exampleprocessor", Enabled: false, }, ExtraSetting: "some export string", diff --git a/internal/configv2/example_factories.go b/internal/configv2/example_factories.go index 2b7712c292a..a29f8a92d6c 100644 --- a/internal/configv2/example_factories.go +++ b/internal/configv2/example_factories.go @@ -23,6 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/internal/factories" "github.com/open-telemetry/opentelemetry-service/processor" "github.com/open-telemetry/opentelemetry-service/receiver" + "go.uber.org/zap" ) // ExampleReceiver is for testing purposes. We are defining an example config and factory @@ -302,6 +303,7 @@ func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor { // CreateTraceProcessor creates a trace processor based on this config. func (f *ExampleProcessorFactory) CreateTraceProcessor( + logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg configmodels.Processor, ) (processor.TraceProcessor, error) { @@ -310,6 +312,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor( // CreateMetricsProcessor creates a metrics processor based on this config. func (f *ExampleProcessorFactory) CreateMetricsProcessor( + logger *zap.Logger, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor, ) (processor.MetricsProcessor, error) { diff --git a/internal/factories/factories.go b/internal/factories/factories.go index 1c2b51f80c0..604c78f8a20 100644 --- a/internal/factories/factories.go +++ b/internal/factories/factories.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/spf13/viper" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/internal/configmodels" @@ -138,13 +139,13 @@ type ProcessorFactory interface { // CreateTraceProcessor creates a trace processor based on this config. // If the processor type does not support tracing or if the config is not valid // error will be returned instead. - CreateTraceProcessor(nextConsumer consumer.TraceConsumer, + CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg configmodels.Processor) (processor.TraceProcessor, error) // CreateMetricsProcessor creates a metrics processor based on this config. // If the processor type does not support metrics or if the config is not valid // error will be returned instead. - CreateMetricsProcessor(nextConsumer consumer.MetricsConsumer, + CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (processor.MetricsProcessor, error) } diff --git a/internal/factories/factories_test.go b/internal/factories/factories_test.go index 8b73a5bb5ab..c0ef04891ea 100644 --- a/internal/factories/factories_test.go +++ b/internal/factories/factories_test.go @@ -18,12 +18,12 @@ import ( "context" "testing" - "github.com/open-telemetry/opentelemetry-service/processor" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-service/consumer" - "github.com/open-telemetry/opentelemetry-service/receiver" - "github.com/open-telemetry/opentelemetry-service/internal/configmodels" + "github.com/open-telemetry/opentelemetry-service/processor" + "github.com/open-telemetry/opentelemetry-service/receiver" ) type ExampleReceiverFactory struct { @@ -157,6 +157,7 @@ func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor { // CreateTraceProcessor creates a trace processor based on this config. func (f *ExampleProcessorFactory) CreateTraceProcessor( + logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg configmodels.Processor, ) (processor.TraceProcessor, error) { @@ -165,6 +166,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor( // CreateMetricsProcessor creates a metrics processor based on this config. func (f *ExampleProcessorFactory) CreateMetricsProcessor( + logger *zap.Logger, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor, ) (processor.MetricsProcessor, error) { diff --git a/processor/addattributesprocessor/config_test.go b/processor/addattributesprocessor/config_test.go index 101fe8b3c6b..f40f78c9f2f 100644 --- a/processor/addattributesprocessor/config_test.go +++ b/processor/addattributesprocessor/config_test.go @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) { &ConfigV2{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "attributes", + NameVal: "attributes/2", }, Values: map[string]interface{}{ "attribute1": 123, diff --git a/processor/addattributesprocessor/factory.go b/processor/addattributesprocessor/factory.go index a224999c7fd..f120c6aed07 100644 --- a/processor/addattributesprocessor/factory.go +++ b/processor/addattributesprocessor/factory.go @@ -15,6 +15,8 @@ package addattributesprocessor import ( + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/internal/configmodels" "github.com/open-telemetry/opentelemetry-service/internal/factories" @@ -28,20 +30,21 @@ const ( typeStr = "attributes" ) -// processorFactory is the factory for OpenCensus exporter. +// processorFactory is the factory for Attributes processor. type processorFactory struct { } -// Type gets the type of the Option config created by this factory. +// Type gets the type of the config created by this factory. func (f *processorFactory) Type() string { return typeStr } -// CreateDefaultConfig creates the default configuration for exporter. +// CreateDefaultConfig creates the default configuration for processor. func (f *processorFactory) CreateDefaultConfig() configmodels.Processor { return &ConfigV2{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: typeStr, + NameVal: typeStr, }, Values: map[string]interface{}{}, } @@ -49,6 +52,7 @@ func (f *processorFactory) CreateDefaultConfig() configmodels.Processor { // CreateTraceProcessor creates a trace processor based on this config. func (f *processorFactory) CreateTraceProcessor( + logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg configmodels.Processor, ) (processor.TraceProcessor, error) { @@ -58,6 +62,7 @@ func (f *processorFactory) CreateTraceProcessor( // CreateMetricsProcessor creates a metrics processor based on this config. func (f *processorFactory) CreateMetricsProcessor( + logger *zap.Logger, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor, ) (processor.MetricsProcessor, error) { diff --git a/processor/addattributesprocessor/factory_test.go b/processor/addattributesprocessor/factory_test.go index 71206c05084..1d71d50f7ab 100644 --- a/processor/addattributesprocessor/factory_test.go +++ b/processor/addattributesprocessor/factory_test.go @@ -17,6 +17,8 @@ package addattributesprocessor import ( "testing" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -37,11 +39,11 @@ func TestCreateProcessor(t *testing.T) { cfg := factory.CreateDefaultConfig() - tp, err := factory.CreateTraceProcessor(nil, cfg) + tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") - mp, err := factory.CreateMetricsProcessor(nil, cfg) + mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) assert.Nil(t, mp) assert.Error(t, err, "should not be able to create metric processor") }