Skip to content

Commit

Permalink
Add factory and new-style config for Batch processor
Browse files Browse the repository at this point in the history
This is part of remaining migration to new configuration format.

Github issue: open-telemetry#52

Testing done: make
  • Loading branch information
Tigran Najaryan committed Jun 25, 2019
1 parent 026061c commit 84afd77
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 0 deletions.
45 changes: 45 additions & 0 deletions internal/collector/processor/nodebatcher/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"time"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
)

// ConfigV2 defines configuration for Attributes processor.
type ConfigV2 struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// Timeout sets the time after which a batch will be sent regardless of size
Timeout *time.Duration `mapstructure:"timeout,omitempty"`

// SendBatchSize is the size of a batch which after hit, will trigger it to be sent.
SendBatchSize *int `mapstructure:"send-batch-size,omitempty"`

// NumTickers sets the number of tickers to use to divide the work of looping
// over batch buckets. This is an advanced configuration option.
NumTickers int `mapstructure:"num-tickers,omitempty"`

// TickTime sets time interval at which the tickers tick. This is an advanced
// configuration option.
TickTime *time.Duration `mapstructure:"tick-time,omitempty"`

// RemoveAfterTicks is the number of ticks that must pass without a span arriving
// from a node after which the batcher for that node will be deleted. This is an
// advanved configuration option.
RemoveAfterTicks *int `mapstructure:"remove-after-ticks,omitempty"`
}
62 changes: 62 additions & 0 deletions internal/collector/processor/nodebatcher/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/configv2"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

var _ = configv2.RegisterTestFactories()

func TestLoadConfig(t *testing.T) {

factory := factories.GetProcessorFactory(typeStr)

config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"))

require.Nil(t, err)
require.NotNil(t, config)

p0 := config.Processors["batch"]
assert.Equal(t, p0, factory.CreateDefaultConfig())

p1 := config.Processors["batch/2"]

timeout := time.Second * 10
tickTime := time.Second * 5
removeAfterTicks := 20
sendBatchSize := 1000

assert.Equal(t, p1,
&ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "batch",
},
Timeout: &timeout,
NumTickers: 10,
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
TickTime: &tickTime,
})
}
100 changes: 100 additions & 0 deletions internal/collector/processor/nodebatcher/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/factories"
"github.com/open-telemetry/opentelemetry-service/processor"
)

var _ = factories.RegisterProcessorFactory(&processorFactory{})

const (
// The value of "type" key in configuration.
typeStr = "batch"
)

// processorFactory is the factory for OpenCensus exporter.
type processorFactory struct {
}

// Type gets the type of the Option config created by this factory.
func (f *processorFactory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for exporter.
func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
removeAfterTicks := int(defaultRemoveAfterCycles)
sendBatchSize := int(defaultSendBatchSize)
tickTime := defaultTickTime
timeout := defaultTimeout

return &ConfigV2{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
},
RemoveAfterTicks: &removeAfterTicks,
SendBatchSize: &sendBatchSize,
NumTickers: defaultNumTickers,
TickTime: &tickTime,
Timeout: &timeout,
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *processorFactory) CreateTraceProcessor(
nextConsumer consumer.TraceConsumer,
c configmodels.Processor,
) (processor.TraceProcessor, error) {
cfg := c.(*ConfigV2)

var batchingOptions []Option
if cfg.Timeout != nil {
batchingOptions = append(batchingOptions, WithTimeout(*cfg.Timeout))
}
if cfg.NumTickers > 0 {
batchingOptions = append(
batchingOptions, WithNumTickers(cfg.NumTickers),
)
}
if cfg.TickTime != nil {
batchingOptions = append(
batchingOptions, WithTickTime(*cfg.TickTime),
)
}
if cfg.SendBatchSize != nil {
batchingOptions = append(
batchingOptions, WithSendBatchSize(*cfg.SendBatchSize),
)
}
if cfg.RemoveAfterTicks != nil {
batchingOptions = append(
batchingOptions, WithRemoveAfterTicks(*cfg.RemoveAfterTicks),
)
}

return NewBatcher("", nil, nextConsumer, batchingOptions...), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *processorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
}
47 changes: 47 additions & 0 deletions internal/collector/processor/nodebatcher/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nodebatcher

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/internal/factories"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
}

func TestCreateProcessor(t *testing.T) {
factory := factories.GetProcessorFactory(typeStr)
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(nil, cfg)
assert.NotNil(t, tp)
assert.NoError(t, err, "cannot create trace processor")

mp, err := factory.CreateMetricsProcessor(nil, cfg)
assert.Nil(t, mp)
assert.Error(t, err, "should not be able to create metric processor")
}
20 changes: 20 additions & 0 deletions internal/collector/processor/nodebatcher/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
receivers:
examplereceiver:

processors:
batch:
batch/2:
timeout: 10s
send-batch-size: 1000
num-tickers: 10
tick-time: 5s
remove-after-ticks: 20

exporters:
exampleexporter:

pipelines:
traces:
receivers: [examplereceiver]
processors: [batch/2]
exporters: [exampleexporter]

0 comments on commit 84afd77

Please sign in to comment.