Skip to content

Commit

Permalink
Add factory and new-style config for Batch processor (#53)
Browse files Browse the repository at this point in the history
This is part of remaining migration to new configuration format.

- Added zap.Logger to ProcessorFactory functions because it is
needed by this processor (and can be used if needed in other processors
in the future).

- Added Name/SetName to Processor interface because it is
needed by this processor (this now also makes Processor uniform with
Receiver and Exporter interfaces).

Github issue: open-telemetry/opentelemetry-collector#52

Testing done: make
  • Loading branch information
tigrannajaryan authored and Paulo Janotti committed Jun 25, 2019
1 parent 38f1390 commit 5553892
Show file tree
Hide file tree
Showing 19 changed files with 334 additions and 15 deletions.
4 changes: 2 additions & 2 deletions cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (pb *PipelinesBuilder) buildPipeline(
var err error
switch pipelineCfg.InputType {
case configmodels.TracesDataType:
tc, err = factory.CreateTraceProcessor(tc, procCfg)
tc, err = factory.CreateTraceProcessor(pb.logger, tc, procCfg)
case configmodels.MetricsDataType:
mc, err = factory.CreateMetricsProcessor(mc, procCfg)
mc, err = factory.CreateMetricsProcessor(pb.logger, mc, procCfg)
}

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/occollector/app/collector/testdata/otelsvc-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ processors:
enabled: true
queued-retry:
enabled: true
batch:
enabled: true

pipelines:
traces:
receivers: [jaeger]
processors: [attributes, queued-retry]
processors: [attributes, batch, queued-retry]
exporters: [opencensus]
45 changes: 45 additions & 0 deletions internal/collector/processor/nodebatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"time"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
)

// ConfigV2 defines configuration for batch processor.
type ConfigV2 struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// Timeout sets the time after which a batch will be sent regardless of size.
Timeout *time.Duration `mapstructure:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
SendBatchSize *int `mapstructure:"send-batch-size,omitempty"`

// NumTickers sets the number of tickers to use to divide the work of looping
// over batch buckets. This is an advanced configuration option.
NumTickers int `mapstructure:"num-tickers,omitempty"`

// TickTime sets time interval at which the tickers tick. This is an advanced
// configuration option.
TickTime *time.Duration `mapstructure:"tick-time,omitempty"`

// RemoveAfterTicks is the number of ticks that must pass without a span arriving
// from a node after which the batcher for that node will be deleted. This is an
// advanced configuration option.
RemoveAfterTicks *int `mapstructure:"remove-after-ticks,omitempty"`
}
63 changes: 63 additions & 0 deletions internal/collector/processor/nodebatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/configv2"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

var _ = configv2.RegisterTestFactories()

func TestLoadConfig(t *testing.T) {

factory := factories.GetProcessorFactory(typeStr)

config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"))

require.Nil(t, err)
require.NotNil(t, config)

p0 := config.Processors["batch"]
assert.Equal(t, p0, factory.CreateDefaultConfig())

p1 := config.Processors["batch/2"]

timeout := time.Second * 10
tickTime := time.Second * 5
removeAfterTicks := 20
sendBatchSize := 1000

assert.Equal(t, p1,
&ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
NameVal: "batch/2",
},
Timeout: &timeout,
NumTickers: 10,
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
TickTime: &tickTime,
})
}
105 changes: 105 additions & 0 deletions internal/collector/processor/nodebatcher/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
)

var _ = factories.RegisterProcessorFactory(&processorFactory{})

const (
// The value of "type" key in configuration.
typeStr = "batch"
)

// processorFactory is the factory for batch processor.
type processorFactory struct {
}

// Type gets the type of the config created by this factory.
func (f *processorFactory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
removeAfterTicks := int(defaultRemoveAfterCycles)
sendBatchSize := int(defaultSendBatchSize)
tickTime := defaultTickTime
timeout := defaultTimeout

return &ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
NumTickers: defaultNumTickers,
TickTime: &tickTime,
Timeout: &timeout,
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *processorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
c configmodels.Processor,
) (processor.TraceProcessor, error) {
cfg := c.(*ConfigV2)

var batchingOptions []Option
if cfg.Timeout != nil {
batchingOptions = append(batchingOptions, WithTimeout(*cfg.Timeout))
}
if cfg.NumTickers > 0 {
batchingOptions = append(
batchingOptions, WithNumTickers(cfg.NumTickers),
)
}
if cfg.TickTime != nil {
batchingOptions = append(
batchingOptions, WithTickTime(*cfg.TickTime),
)
}
if cfg.SendBatchSize != nil {
batchingOptions = append(
batchingOptions, WithSendBatchSize(*cfg.SendBatchSize),
)
}
if cfg.RemoveAfterTicks != nil {
batchingOptions = append(
batchingOptions, WithRemoveAfterTicks(*cfg.RemoveAfterTicks),
)
}

return NewBatcher(cfg.NameVal, logger, nextConsumer, batchingOptions...), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *processorFactory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
}
49 changes: 49 additions & 0 deletions internal/collector/processor/nodebatcher/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"testing"

"go.uber.org/zap"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
}

func TestCreateProcessor(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
20 changes: 20 additions & 0 deletions internal/collector/processor/nodebatcher/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
receivers:
examplereceiver:

processors:
batch:
batch/2:
timeout: 10s
send-batch-size: 1000
num-tickers: 10
tick-time: 5s
remove-after-ticks: 20

exporters:
exampleexporter:

pipelines:
traces:
receivers: [examplereceiver]
processors: [batch/2]
exporters: [exampleexporter]
1 change: 1 addition & 0 deletions internal/collector/processor/queued/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestLoadConfig(t *testing.T) {
&ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "queued-retry",
NameVal: "queued-retry/2",
},
NumWorkers: 2,
QueueSize: 10,
Expand Down
4 changes: 4 additions & 0 deletions internal/collector/processor/queued/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
"go.uber.org/zap"
)

var _ = factories.RegisterProcessorFactory(&processorFactory{})
Expand All @@ -44,6 +45,7 @@ func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
return &ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
NumWorkers: 10,
QueueSize: 5000,
Expand All @@ -54,6 +56,7 @@ func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {

// CreateTraceProcessor creates a trace processor based on this config.
func (f *processorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
Expand All @@ -68,6 +71,7 @@ func (f *processorFactory) CreateTraceProcessor(

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *processorFactory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
Expand Down
6 changes: 4 additions & 2 deletions internal/collector/processor/queued/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package queued
import (
"testing"

"go.uber.org/zap"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -37,11 +39,11 @@ func TestCreateProcessor(t *testing.T) {

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(nil, cfg)
tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(nil, cfg)
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
Loading

0 comments on commit 5553892

Please sign in to comment.