Skip to content

Commit

Permalink
Add factory and new-style config for Batch processor
Browse files Browse the repository at this point in the history
This is part of remaining migration to new configuration format.

- Added zap.Logger to ProcessorFactory functions because it is
needed by this processor (and can be used if needed in other processors
in the future).

- Added Name/SetName to Processor interface because it is
needed by this processor (this now also makes Processor uniform with
Receiver and Exporter interfaces).

Github issue: open-telemetry#52

Testing done: make
  • Loading branch information
Tigran Najaryan committed Jun 25, 2019
1 parent 026061c commit ffa30bd
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 10 deletions.
4 changes: 2 additions & 2 deletions cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (pb *PipelinesBuilder) buildPipeline(
var err error
switch pipelineCfg.InputType {
case configmodels.TracesDataType:
tc, err = factory.CreateTraceProcessor(tc, procCfg)
tc, err = factory.CreateTraceProcessor(pb.logger, tc, procCfg)
case configmodels.MetricsDataType:
mc, err = factory.CreateMetricsProcessor(mc, procCfg)
mc, err = factory.CreateMetricsProcessor(pb.logger, mc, procCfg)
}

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/occollector/app/collector/testdata/otelsvc-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ exporters:
processors:
attributes:
enabled: true
batch:
enabled: true

pipelines:
traces:
receivers: [jaeger]
processors: [attributes]
processors: [attributes, batch]
exporters: [opencensus]
45 changes: 45 additions & 0 deletions internal/collector/processor/nodebatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"time"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
)

// ConfigV2 defines configuration for batch processor.
type ConfigV2 struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// Timeout sets the time after which a batch will be sent regardless of size
Timeout *time.Duration `mapstructure:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
SendBatchSize *int `mapstructure:"send-batch-size,omitempty"`

// NumTickers sets the number of tickers to use to divide the work of looping
// over batch buckets. This is an advanced configuration option.
NumTickers int `mapstructure:"num-tickers,omitempty"`

// TickTime sets time interval at which the tickers tick. This is an advanced
// configuration option.
TickTime *time.Duration `mapstructure:"tick-time,omitempty"`

// RemoveAfterTicks is the number of ticks that must pass without a span arriving
// from a node after which the batcher for that node will be deleted. This is an
// advanved configuration option.
RemoveAfterTicks *int `mapstructure:"remove-after-ticks,omitempty"`
}
63 changes: 63 additions & 0 deletions internal/collector/processor/nodebatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/configv2"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

var _ = configv2.RegisterTestFactories()

func TestLoadConfig(t *testing.T) {

factory := factories.GetProcessorFactory(typeStr)

config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"))

require.Nil(t, err)
require.NotNil(t, config)

p0 := config.Processors["batch"]
assert.Equal(t, p0, factory.CreateDefaultConfig())

p1 := config.Processors["batch/2"]

timeout := time.Second * 10
tickTime := time.Second * 5
removeAfterTicks := 20
sendBatchSize := 1000

assert.Equal(t, p1,
&ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch/2",
},
Timeout: &timeout,
NumTickers: 10,
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
TickTime: &tickTime,
})
}
105 changes: 105 additions & 0 deletions internal/collector/processor/nodebatcher/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
)

var _ = factories.RegisterProcessorFactory(&processorFactory{})

const (
// The value of "type" key in configuration.
typeStr = "batch"
)

// processorFactory is the factory for batch processor.
type processorFactory struct {
}

// Type gets the type of the config created by this factory.
func (f *processorFactory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
removeAfterTicks := int(defaultRemoveAfterCycles)
sendBatchSize := int(defaultSendBatchSize)
tickTime := defaultTickTime
timeout := defaultTimeout

return &ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
NumTickers: defaultNumTickers,
TickTime: &tickTime,
Timeout: &timeout,
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *processorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
c configmodels.Processor,
) (processor.TraceProcessor, error) {
cfg := c.(*ConfigV2)

var batchingOptions []Option
if cfg.Timeout != nil {
batchingOptions = append(batchingOptions, WithTimeout(*cfg.Timeout))
}
if cfg.NumTickers > 0 {
batchingOptions = append(
batchingOptions, WithNumTickers(cfg.NumTickers),
)
}
if cfg.TickTime != nil {
batchingOptions = append(
batchingOptions, WithTickTime(*cfg.TickTime),
)
}
if cfg.SendBatchSize != nil {
batchingOptions = append(
batchingOptions, WithSendBatchSize(*cfg.SendBatchSize),
)
}
if cfg.RemoveAfterTicks != nil {
batchingOptions = append(
batchingOptions, WithRemoveAfterTicks(*cfg.RemoveAfterTicks),
)
}

return NewBatcher(cfg.NameVal, logger, nextConsumer, batchingOptions...), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *processorFactory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
}
49 changes: 49 additions & 0 deletions internal/collector/processor/nodebatcher/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"testing"

"go.uber.org/zap"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
}

func TestCreateProcessor(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
20 changes: 20 additions & 0 deletions internal/collector/processor/nodebatcher/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
receivers:
examplereceiver:

processors:
batch:
batch/2:
timeout: 10s
send-batch-size: 1000
num-tickers: 10
tick-time: 5s
remove-after-ticks: 20

exporters:
exampleexporter:

pipelines:
traces:
receivers: [examplereceiver]
processors: [batch/2]
exporters: [exampleexporter]
12 changes: 12 additions & 0 deletions internal/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Exporters map[string]Exporter
// Processor is the configuration of a processor. Specific processors must implement this
// interface and will typically embed ProcessorSettings struct or a struct that extends it.
type Processor interface {
NamedEntity
Type() string
SetType(typeStr string)
}
Expand Down Expand Up @@ -188,9 +189,20 @@ func (es *ExporterSettings) SetType(typeStr string) {
// Specific processors can embed this struct and extend it with more fields if needed.
type ProcessorSettings struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
}

// Name gets the processor name.
func (proc *ProcessorSettings) Name() string {
return proc.NameVal
}

// SetName sets the processor name.
func (proc *ProcessorSettings) SetName(name string) {
proc.NameVal = name
}

// Type sets the processor type.
func (proc *ProcessorSettings) Type() string {
return proc.TypeVal
Expand Down
1 change: 1 addition & 0 deletions internal/configv2/configv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func loadProcessors(v *viper.Viper) (configmodels.Processors, error) {
// Create the default config for this processors
processorCfg := factory.CreateDefaultConfig()
processorCfg.SetType(typeStr)
processorCfg.SetName(fullName)

// Now that the default config struct is created we can Unmarshal into it
// and it will apply user-defined config on top of the default.
Expand Down
1 change: 1 addition & 0 deletions internal/configv2/configv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func TestDecodeConfig(t *testing.T) {
&ExampleProcessor{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "exampleprocessor",
NameVal: "exampleprocessor",
Enabled: false,
},
ExtraSetting: "some export string",
Expand Down
3 changes: 3 additions & 0 deletions internal/configv2/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
"go.uber.org/zap"
)

// ExampleReceiver is for testing purposes. We are defining an example config and factory
Expand Down Expand Up @@ -302,6 +303,7 @@ func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor {

// CreateTraceProcessor creates a trace processor based on this config.
func (f *ExampleProcessorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
Expand All @@ -310,6 +312,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor(

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *ExampleProcessorFactory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
Expand Down
Loading

0 comments on commit ffa30bd

Please sign in to comment.