Skip to content

Commit

Permalink
Move ReceiverFactory to "receiver" package (open-telemetry#86)
Browse files Browse the repository at this point in the history
This is part 1 of 3 of the refactoring plan to eliminate factories.go
and move the content to corresponding packages (receiver, exporter, processor).
  • Loading branch information
tigrannajaryan authored Jul 2, 2019
1 parent a61a358 commit 4979bbd
Show file tree
Hide file tree
Showing 27 changed files with 262 additions and 204 deletions.
4 changes: 2 additions & 2 deletions cmd/occollector/app/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (eb *ExportersBuilder) buildExporter(
// Traces data type is required. Create a trace exporter based on config.
tc, stopFunc, err := factory.CreateTraceExporter(eb.logger, config)
if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
if err == models.ErrDataTypeIsNotSupported {
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, models.TracesDataType)
}
Expand All @@ -185,7 +185,7 @@ func (eb *ExportersBuilder) buildExporter(
// Metrics data type is required. Create a trace exporter based on config.
mc, stopFunc, err := factory.CreateMetricsExporter(eb.logger, config)
if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
if err == models.ErrDataTypeIsNotSupported {
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, models.MetricsDataType)
}
Expand Down
19 changes: 9 additions & 10 deletions cmd/occollector/app/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/internal"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
Expand Down Expand Up @@ -172,10 +171,10 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config models.Receiver) (attac
}

func (rb *ReceiversBuilder) attachReceiverToPipelines(
factory factories.ReceiverFactory,
factory receiver.Factory,
dataType models.DataType,
config models.Receiver,
receiver *builtReceiver,
rcv *builtReceiver,
pipelineProcessors []*builtProcessor,
) error {
// There are pipelines of the specified data type that must be attached to
Expand All @@ -188,15 +187,15 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
junction := buildFanoutTraceConsumer(pipelineProcessors)

// Now create the receiver and tell it to send to the junction point.
receiver.trace, err = factory.CreateTraceReceiver(context.Background(), rb.logger, config, junction)
rcv.trace, err = factory.CreateTraceReceiver(context.Background(), rb.logger, config, junction)

case models.MetricsDataType:
junction := buildFanoutMetricConsumer(pipelineProcessors)
receiver.metrics, err = factory.CreateMetricsReceiver(rb.logger, config, junction)
rcv.metrics, err = factory.CreateMetricsReceiver(rb.logger, config, junction)
}

if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
if err == models.ErrDataTypeIsNotSupported {
return fmt.Errorf(
"receiver %s does not support %s but it was used in a "+
"%s pipeline",
Expand All @@ -222,8 +221,8 @@ func (rb *ReceiversBuilder) buildReceiver(config models.Receiver) (*builtReceive
}

// Prepare to build the receiver.
factory := factories.GetReceiverFactory(config.Type())
receiver := &builtReceiver{}
factory := receiver.GetReceiverFactory(config.Type())
rcv := &builtReceiver{}

// Now we have list of pipelines broken down by data type. Iterate for each data type.
for dataType, pipelines := range pipelinesToAttach {
Expand All @@ -234,13 +233,13 @@ func (rb *ReceiversBuilder) buildReceiver(config models.Receiver) (*builtReceive

// Attach the corresponding part of the receiver to all pipelines that require
// this data type.
err := rb.attachReceiverToPipelines(factory, dataType, config, receiver, pipelines)
err := rb.attachReceiverToPipelines(factory, dataType, config, rcv, pipelines)
if err != nil {
return nil, err
}
}

return receiver, nil
return rcv, nil
}

func buildFanoutTraceConsumer(pipelineFrontProcessors []*builtProcessor) consumer.TraceConsumer {
Expand Down
3 changes: 2 additions & 1 deletion configv2/configv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/receiver"
)

// These are errors that can be returned by Load(). Note that error codes are not part
Expand Down Expand Up @@ -181,7 +182,7 @@ func loadReceivers(v *viper.Viper) (models.Receivers, error) {
}

// Find receiver factory based on "type" that we read from config source
factory := factories.GetReceiverFactory(typeStr)
factory := receiver.GetReceiverFactory(typeStr)
if factory == nil {
return nil, &configError{
code: errUnknownReceiverType,
Expand Down
16 changes: 8 additions & 8 deletions configv2/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (f *ExampleReceiverFactory) Type() string {
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
func (f *ExampleReceiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler {
func (f *ExampleReceiverFactory) CustomUnmarshaler() receiver.CustomUnmarshaler {
return nil
}

Expand All @@ -73,7 +73,7 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
nextConsumer consumer.TraceConsumer,
) (receiver.TraceReceiver, error) {
if cfg.(*ExampleReceiver).FailTraceCreation {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil
}
Expand All @@ -85,7 +85,7 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
nextConsumer consumer.MetricsConsumer,
) (receiver.MetricsReceiver, error) {
if cfg.(*ExampleReceiver).FailMetricsCreation {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (f *MultiProtoReceiverFactory) Type() string {
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
func (f *MultiProtoReceiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler {
func (f *MultiProtoReceiverFactory) CustomUnmarshaler() receiver.CustomUnmarshaler {
return nil
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor(
nextConsumer consumer.TraceConsumer,
cfg models.Processor,
) (processor.TraceProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}

// CreateMetricsProcessor creates a metrics processor based on this config.
Expand All @@ -320,13 +320,13 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg models.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}

// RegisterTestFactories registers example factories. This is only used by tests.
func RegisterTestFactories() error {
_ = factories.RegisterReceiverFactory(&ExampleReceiverFactory{})
_ = factories.RegisterReceiverFactory(&MultiProtoReceiverFactory{})
_ = receiver.RegisterReceiverFactory(&ExampleReceiverFactory{})
_ = receiver.RegisterReceiverFactory(&MultiProtoReceiverFactory{})
_ = factories.RegisterExporterFactory(&ExampleExporterFactory{})
_ = factories.RegisterProcessorFactory(&ExampleProcessorFactory{})
return nil
Expand Down
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,5 +150,5 @@ func (f *exporterFactory) CreateTraceExporter(logger *zap.Logger, config models.

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *exporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg models.Exporter) (consumer.MetricsConsumer, factories.StopFunc, error) {
return nil, nil, factories.ErrDataTypeIsNotSupported
return nil, nil, models.ErrDataTypeIsNotSupported
}
3 changes: 2 additions & 1 deletion exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/internal/compression"
"github.com/open-telemetry/opentelemetry-service/models"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -40,7 +41,7 @@ func TestCreateMetricsExporter(t *testing.T) {
cfg := factory.CreateDefaultConfig()

_, _, err := factory.CreateMetricsExporter(zap.NewNop(), cfg)
assert.Error(t, err, factories.ErrDataTypeIsNotSupported)
assert.Error(t, err, models.ErrDataTypeIsNotSupported)
}

func TestCreateTraceExporter(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion exporter/opencensusexporter/opencensusexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/spf13/viper"

"github.com/open-telemetry/opentelemetry-service/data"
)

func TestOpenCensusTraceExportersFromViper(t *testing.T) {
Expand Down
60 changes: 0 additions & 60 deletions factories/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,15 @@
package factories

import (
"context"
"errors"
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)

///////////////////////////////////////////////////////////////////////////////
// Receiver factory and its registry.

// ReceiverFactory is factory interface for receivers.
type ReceiverFactory interface {
// Type gets the type of the Receiver created by this factory.
Type() string

// CreateDefaultConfig creates the default configuration for the Receiver.
CreateDefaultConfig() models.Receiver

// CustomUnmarshaler returns a custom unmarshaler for the configuration or nil if
// there is no need for custom unmarshaling. This is typically used if viper.Unmarshal()
// is not sufficient to unmarshal correctly.
CustomUnmarshaler() CustomUnmarshaler

// CreateTraceReceiver creates a trace receiver based on this config.
// If the receiver type does not support tracing or if the config is not valid
// error will be returned instead.
CreateTraceReceiver(ctx context.Context, logger *zap.Logger, cfg models.Receiver,
nextConsumer consumer.TraceConsumer) (receiver.TraceReceiver, error)

// CreateMetricsReceiver creates a metrics receiver based on this config.
// If the receiver type does not support metrics or if the config is not valid
// error will be returned instead.
CreateMetricsReceiver(logger *zap.Logger, cfg models.Receiver,
consumer consumer.MetricsConsumer) (receiver.MetricsReceiver, error)
}

// ErrDataTypeIsNotSupported can be returned by CreateTraceReceiver or
// CreateMetricsReceiver if the particular telemetry data type is not supported
// by the receiver.
var ErrDataTypeIsNotSupported = errors.New("telemetry type is not supported")

// CustomUnmarshaler is a function that un-marshals a viper data into a config struct
// in a custom way.
type CustomUnmarshaler func(v *viper.Viper, viperKey string, intoCfg interface{}) error

// List of registered receiver factories.
var receiverFactories = make(map[string]ReceiverFactory)

// RegisterReceiverFactory registers a receiver factory.
func RegisterReceiverFactory(factory ReceiverFactory) error {
if receiverFactories[factory.Type()] != nil {
panic(fmt.Sprintf("duplicate receiver factory %q", factory.Type()))
}

receiverFactories[factory.Type()] = factory
return nil
}

// GetReceiverFactory gets a receiver factory by type string.
func GetReceiverFactory(typeStr string) ReceiverFactory {
return receiverFactories[typeStr]
}

///////////////////////////////////////////////////////////////////////////////
// Exporter factory and its registry.

Expand Down
73 changes: 2 additions & 71 deletions factories/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,84 +15,15 @@
package factories

import (
"context"
"testing"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)

type ExampleReceiverFactory struct {
}

// Type gets the type of the Receiver config created by this factory.
func (f *ExampleReceiverFactory) Type() string {
return "examplereceiver"
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this factory.
func (f *ExampleReceiverFactory) CustomUnmarshaler() CustomUnmarshaler {
return nil
}

// CreateDefaultConfig creates the default configuration for the Receiver.
func (f *ExampleReceiverFactory) CreateDefaultConfig() models.Receiver {
return nil
}

// CreateTraceReceiver creates a trace receiver based on this config.
func (f *ExampleReceiverFactory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg models.Receiver,
nextConsumer consumer.TraceConsumer,
) (receiver.TraceReceiver, error) {
// Not used for this test, just return nil
return nil, nil
}

// CreateMetricsReceiver creates a metrics receiver based on this config.
func (f *ExampleReceiverFactory) CreateMetricsReceiver(
logger *zap.Logger,
cfg models.Receiver,
consumer consumer.MetricsConsumer,
) (receiver.MetricsReceiver, error) {
// Not used for this test, just return nil
return nil, nil
}

func TestRegisterReceiverFactory(t *testing.T) {
f := ExampleReceiverFactory{}
err := RegisterReceiverFactory(&f)
if err != nil {
t.Fatalf("cannot register factory")
}

if &f != GetReceiverFactory(f.Type()) {
t.Fatalf("cannot find factory")
}

// Verify that attempt to register a factory with duplicate name panics
panicked := false
func() {
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()

err = RegisterReceiverFactory(&f)
}()

if !panicked {
t.Fatalf("must panic on double registration")
}
}

type ExampleExporterFactory struct {
}

Expand Down Expand Up @@ -163,7 +94,7 @@ func (f *ExampleProcessorFactory) CreateTraceProcessor(
nextConsumer consumer.TraceConsumer,
cfg models.Processor,
) (processor.TraceProcessor, error) {
return nil, ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}

// CreateMetricsProcessor creates a metrics processor based on this config.
Expand All @@ -172,7 +103,7 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg models.Processor,
) (processor.MetricsProcessor, error) {
return nil, ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}

func TestRegisterProcessorFactory(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/processor/nodebatcher/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ func (f *processorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg models.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}
2 changes: 1 addition & 1 deletion internal/collector/processor/queued/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ func (f *processorFactory) CreateMetricsProcessor(
nextConsumer consumer.MetricsConsumer,
cfg models.Processor,
) (processor.MetricsProcessor, error) {
return nil, factories.ErrDataTypeIsNotSupported
return nil, models.ErrDataTypeIsNotSupported
}
Loading

0 comments on commit 4979bbd

Please sign in to comment.