Skip to content

Commit

Permalink
Fix based on PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tigran Najaryan committed Jun 20, 2019
1 parent 1a51c94 commit 320174d
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 32 deletions.
6 changes: 2 additions & 4 deletions cmd/occollector/app/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (eb *ExportersBuilder) buildExporter(
exporter.stop = combineStopFunc(exporter.stop, stopFunc)
}

eb.logger.Info(fmt.Sprintf("Exporter %s is enabled.", config.Name()))
eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name()))

return exporter, nil
}
Expand All @@ -206,9 +206,7 @@ func typeMismatchErr(
requiredByPipeline *configmodels.Pipeline,
dataType configmodels.DataType,
) error {
return fmt.Errorf(
"pipeline %s is attached %s to exporter %s which does not support %s "+
"telemetry data produced by pipeline. Exporter will be detached from pipeline",
return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s",
requiredByPipeline.Name, dataType.GetString(),
config.Name(), dataType.GetString(),
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (pb *PipelinesBuilder) buildPipeline(
}
}

pb.logger.Info(fmt.Sprintf("Pipeline %s enabled.", pipelineCfg.Name))
pb.logger.Info("Pipeline is enabled.", zap.String("pipelines", pipelineCfg.Name))

return &builtProcessor{tc, mc}, nil
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/occollector/app/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ func (rcvs Receivers) StopAll() {
// StartAll starts all receivers.
func (rcvs Receivers) StartAll(logger *zap.Logger, asyncErrorChan chan<- error) error {
for cfg, rcv := range rcvs {
logger.Info(fmt.Sprintf("Receiver %s starting...", cfg.Name()))
logger.Info("Receiver is starting...", zap.String("receiver", cfg.Name()))

if err := rcv.Start(asyncErrorChan); err != nil {
return err
}
logger.Info(fmt.Sprintf("Receiver %s started.", cfg.Name()))
logger.Info("Receiver is started.", zap.String("receiver", cfg.Name()))
}
return nil
}
Expand Down Expand Up @@ -197,17 +198,17 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
return fmt.Errorf(
"receiver %s does not support %s but some pipelines that "+
"want to process %s are attached to the receiever",
"receiver %s does not support %s but it was used in a "+
"%s pipeline",
config.Name(),
dataType.GetString(),
dataType.GetString())
}
return fmt.Errorf("cannot create receiver %s", config.Name())
}

rb.logger.Info(fmt.Sprintf("Receiver %s is enabled for %s.",
config.Name(), dataType.GetString()))
rb.logger.Info("Receiver is enabled.",
zap.String("receiver", config.Name()), zap.String("datatype", dataType.GetString()))

return nil
}
Expand Down
34 changes: 17 additions & 17 deletions cmd/occollector/app/builder/receivers_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,23 +195,23 @@ func testReceivers(
}
}

func TestReceiversBuilder_Error(t *testing.T) {
//config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
//require.Nil(t, err)
//
//// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here
//// since there is no way to have such config loaded by LoadConfigFile, it would not
//// pass validation. We are doing this to test failure mode of PipelinesBuilder.
//pipeline := config.Pipelines["traces"]
//pipeline.InputType = configmodels.MetricsDataType
//
//exporters, err := NewExportersBuilder(zap.NewNop(), config).Build()
//
//// This should fail because "attributes" processor defined in the config does
//// not support metrics data type.
//_, err = NewPipelinesBuilder(zap.NewNop(), config, exporters).Build()
//
//assert.NotNil(t, err)
func TestReceiversBuilder_DataTypeError(t *testing.T) {
config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml")
require.Nil(t, err)

// Make examplereceiver to "unsupport" trace data type.
receiver := config.Receivers["examplereceiver"]
receiver.(*configv2.ExampleReceiver).FailTraceCreation = true

// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), config).Build()
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), config, allExporters).Build()
receivers, err := NewReceiversBuilder(zap.NewNop(), config, pipelineProcessors).Build()

// This should fail because "examplereceiver" is attached to "traces" pipeline
// which is a configuration error.
assert.NotNil(t, err)
assert.Nil(t, receivers)
}

func TestReceiversBuilder_StartAll(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,12 @@ type ReceiverSettings struct {
Endpoint string `mapstructure:"endpoint"`
}

// Name gets the exporter name.
// Name gets the receiver name.
func (rs *ReceiverSettings) Name() string {
return rs.NameVal
}

// SetName sets the exporter name.
// SetName sets the receiver name.
func (rs *ReceiverSettings) SetName(name string) {
rs.NameVal = name
}
Expand Down
14 changes: 12 additions & 2 deletions internal/configv2/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
type ExampleReceiver struct {
configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
ExtraSetting string `mapstructure:"extra"`

// FailTraceCreation causes CreateTraceReceiver to fail. Useful for testing.
FailTraceCreation bool `mapstructure:"-"`

// FailMetricsCreation causes CreateTraceReceiver to fail. Useful for testing.
FailMetricsCreation bool `mapstructure:"-"`
}

// ExampleReceiverFactory is factory for ExampleReceiver.
Expand Down Expand Up @@ -64,7 +70,9 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumer,
) (receiver.TraceReceiver, error) {
// Not used for this test, just return nil
if cfg.(*ExampleReceiver).FailTraceCreation {
return nil, factories.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil
}

Expand All @@ -73,7 +81,9 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumer,
) (receiver.MetricsReceiver, error) {
// Not used for this test, just return nil
if cfg.(*ExampleReceiver).FailMetricsCreation {
return nil, factories.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil
}

Expand Down

0 comments on commit 320174d

Please sign in to comment.