Skip to content

Commit

Permalink
Make component interfaces uniform
Browse files Browse the repository at this point in the history
This change fixes inconsistencies in component interfaces. Motivation:

- Uniformness results in reduction of code that currently has to
  deal with differences.
- Processor.Start is missing and is important for allowing processors
  to communicate with the Host.

What's changed:

- Introduced Component interface.
- Unified Host interface.
- Added a Start function to processors (via Component interface).
- Start/Shutdown is now called for Processors from service start/shutdown.
- Receivers, Exporters, Processors, Extensions now embed Component interface.
- Replaced StartTraceReception/StartMetricsReception by single Start function for receivers.
- Replaced StopTraceReception/StopMetricsReception by single Shutdown function for receivers.

Note: before merging this we need to announce the change in Gitter since it
breaks existing implementations in contrib (although the fix is easy).

Resolves #477
Resolves #262
  • Loading branch information
Tigran Najaryan committed Jan 6, 2020
1 parent c300f13 commit e9e0317
Show file tree
Hide file tree
Showing 53 changed files with 435 additions and 332 deletions.
41 changes: 41 additions & 0 deletions component/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component

import "context"

// Component is either a receiver, exporter, processor or extension.
type Component interface {
// Start tells the component to start. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
// If this is an exporter component it may prepare for exporting
// by connecting to the endpoint.
Start(host Host) error

// Shutdown is invoked during service shutdown.
Shutdown() error
}

type Host interface {
// ReportFatalError is used to report to the host that the extension
// encountered a fatal error (i.e.: an error that the instance can't recover
// from) after its start function had already returned.
ReportFatalError(err error)

// Context returns a context provided by the host to be used on the component
// operations.
Context() context.Context
}
15 changes: 15 additions & 0 deletions component/component_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package component
8 changes: 3 additions & 5 deletions receiver/receivertest/mock_host.go → component/mock_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,17 @@

// Package receivertest define types and functions used to help test packages
// implementing the receiver package interfaces.
package receivertest
package component

import (
"context"

"github.com/open-telemetry/opentelemetry-collector/receiver"
)

// MockHost mocks a receiver.ReceiverHost for test purposes.
type MockHost struct {
}

var _ receiver.Host = (*MockHost)(nil)
var _ Host = (*MockHost)(nil)

// Context returns a context provided by the host to be used on the receiver
// operations.
Expand All @@ -43,6 +41,6 @@ func (mh *MockHost) ReportFatalError(err error) {

// NewMockHost returns a new instance of MockHost with proper defaults for most
// tests.
func NewMockHost() receiver.Host {
func NewMockHost() Host {
return &MockHost{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

// Package receivertest define types and functions used to help test packages
// implementing the receiver package interfaces.
package receivertest
package component

import (
"errors"
Expand Down
67 changes: 43 additions & 24 deletions config/example_factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configerror"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
Expand Down Expand Up @@ -82,7 +83,20 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
if cfg.(*ExampleReceiver).FailTraceCreation {
return nil, configerror.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := exampleReceivers[cfg]
if !ok {
receiver = &ExampleReceiverProducer{}
// Remember the receiver in the map
exampleReceivers[cfg] = receiver
}
receiver.TraceConsumer = nextConsumer

return receiver, nil
}

// CreateMetricsReceiver creates a metrics receiver based on this config.
Expand All @@ -94,33 +108,44 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
if cfg.(*ExampleReceiver).FailMetricsCreation {
return nil, configerror.ErrDataTypeIsNotSupported
}
return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := exampleReceivers[cfg]
if !ok {
receiver = &ExampleReceiverProducer{}
// Remember the receiver in the map
exampleReceivers[cfg] = receiver
}
receiver.MetricsConsumer = nextConsumer

return receiver, nil
}

// ExampleReceiverProducer allows producing traces and metrics for testing purposes.
type ExampleReceiverProducer struct {
TraceConsumer consumer.TraceConsumer
TraceStarted bool
TraceStopped bool
Started bool
Stopped bool
MetricsConsumer consumer.MetricsConsumer
MetricsStarted bool
MetricsStopped bool
}

// TraceSource returns the name of the trace data source.
func (erp *ExampleReceiverProducer) TraceSource() string {
return ""
}

// StartTraceReception tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) StartTraceReception(host receiver.Host) error {
erp.TraceStarted = true
// Start tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) Start(host component.Host) error {
erp.Started = true
return nil
}

// StopTraceReception tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) StopTraceReception() error {
erp.TraceStopped = true
// Shutdown tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) Shutdown() error {
erp.Stopped = true
return nil
}

Expand All @@ -129,17 +154,11 @@ func (erp *ExampleReceiverProducer) MetricsSource() string {
return ""
}

// StartMetricsReception tells the receiver to start its processing.
func (erp *ExampleReceiverProducer) StartMetricsReception(host receiver.Host) error {
erp.MetricsStarted = true
return nil
}

// StopMetricsReception tells the receiver that should stop reception,
func (erp *ExampleReceiverProducer) StopMetricsReception() error {
erp.MetricsStopped = true
return nil
}
// This is the map of already created example receivers for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTraceReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one Receiver object per configuration.
var exampleReceivers = map[configmodels.Receiver]*ExampleReceiverProducer{}

// MultiProtoReceiver is for testing purposes. We are defining an example multi protocol
// config and factory for "multireceiver" receiver type.
Expand Down Expand Up @@ -293,7 +312,7 @@ type ExampleExporterConsumer struct {
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (exp *ExampleExporterConsumer) Start(host exporter.Host) error {
func (exp *ExampleExporterConsumer) Start(host component.Host) error {
exp.ExporterStarted = true
return nil
}
Expand Down
19 changes: 2 additions & 17 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,13 @@
package exporter

import (
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
)

// Host represents the entity where the exporter is being hosted. It is used to
// allow communication between the exporter and its host.
type Host interface {
// ReportFatalError is used to report to the host that the exporter encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
ReportFatalError(err error)
}

// Exporter defines functions that trace and metric exporters must implement.
type Exporter interface {
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
Start(host Host) error

// Shutdown is invoked during service shutdown.
Shutdown() error
component.Component
}

// TraceExporter composes TraceConsumer with some additional exporter-specific functions.
Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
Expand All @@ -37,7 +38,7 @@ type metricsExporter struct {

var _ (exporter.MetricsExporter) = (*metricsExporter)(nil)

func (me *metricsExporter) Start(host exporter.Host) error {
func (me *metricsExporter) Start(host component.Host) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
Expand All @@ -37,7 +38,7 @@ type traceExporter struct {

var _ (exporter.TraceExporter) = (*traceExporter)(nil)

func (te *traceExporter) Start(host exporter.Host) error {
func (te *traceExporter) Start(host component.Host) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/exportertest/nop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package exportertest
import (
"context"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
)
Expand All @@ -32,7 +33,7 @@ type nopExporter struct {
var _ exporter.TraceExporter = (*nopExporter)(nil)
var _ exporter.MetricsExporter = (*nopExporter)(nil)

func (ne *nopExporter) Start(host exporter.Host) error {
func (ne *nopExporter) Start(host component.Host) error {
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions exporter/exportertest/sink_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"sync"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
)
Expand All @@ -33,7 +34,7 @@ var _ exporter.TraceExporter = (*SinkTraceExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (ste *SinkTraceExporter) Start(host exporter.Host) error {
func (ste *SinkTraceExporter) Start(host component.Host) error {
return nil
}

Expand Down Expand Up @@ -76,7 +77,7 @@ var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (sme *SinkMetricsExporter) Start(host exporter.Host) error {
func (sme *SinkMetricsExporter) Start(host component.Host) error {
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/compression"
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/config/configgrpc"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/receiver/opencensusreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/receivertest"
"github.com/open-telemetry/opentelemetry-collector/testutils"
)

Expand Down Expand Up @@ -66,8 +66,8 @@ func TestCreateTraceExporter(t *testing.T) {
new(exportertest.SinkTraceExporter))
require.NotNil(t, rcv)
require.Nil(t, err)
require.Nil(t, rcv.StartTraceReception(receivertest.NewMockHost()))
defer rcv.StopTraceReception()
require.Nil(t, rcv.Start(component.NewMockHost()))
defer rcv.Shutdown()

tests := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions exporter/prometheusexporter/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
// official census-ecosystem location, update this import path.
"github.com/orijtech/prometheus-go-metrics-exporter"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
)

Expand All @@ -38,7 +38,7 @@ type prometheusExporter struct {

var _ consumer.MetricsConsumer = (*prometheusExporter)(nil)

func (pe *prometheusExporter) Start(host exporter.Host) error {
func (pe *prometheusExporter) Start(host component.Host) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/zipkinexporter/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/observability"
tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace"
spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata"
Expand Down Expand Up @@ -160,7 +160,7 @@ func extractStringAttribute(
return value, ok
}

func (ze *zipkinExporter) Start(host exporter.Host) error {
func (ze *zipkinExporter) Start(host component.Host) error {
return nil
}

Expand Down
Loading

0 comments on commit e9e0317

Please sign in to comment.