Skip to content

Commit

Permalink
Rename fanoutprocessor to FanOutConnector (#285)
Browse files Browse the repository at this point in the history
* Rename fanoutprocessor to FanOutConnector

This implements #283

* PR fixes
  • Loading branch information
tigrannajaryan authored and Paulo Janotti committed Aug 23, 2019
1 parent 55ebc45 commit 99ab342
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 39 deletions.
4 changes: 2 additions & 2 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Pipelines can operate on 2 telemetry data types: traces and metrics. The data ty

![Pipelines](images/design-pipelines.png)

There can be one or more receivers in a pipeline. Data from all receivers is pushed to the first processor, which performs a processing on it and then pushes it to the next processor (or it may drop the data, e.g. if it is a “sampling” processor) and so on until the last processor in the pipeline pushes the data to the exporters. Each exporter gets a copy of each data element. The last processor uses a `fanoutprocessor` to fanout the data to multiple exporters.
There can be one or more receivers in a pipeline. Data from all receivers is pushed to the first processor, which performs a processing on it and then pushes it to the next processor (or it may drop the data, e.g. if it is a “sampling” processor) and so on until the last processor in the pipeline pushes the data to the exporters. Each exporter gets a copy of each data element. The last processor uses a `FanOutConnector` to fan out the data to multiple exporters.

The pipeline is constructed during Service startup based on pipeline definition in the config file.

Expand Down Expand Up @@ -63,7 +63,7 @@ When the Service loads this config the result will look like this (part of proce
![Receivers](images/design-receivers.png)
Important: when the same receiver is referenced in more than one pipeline the Service will create only one receiver instance at runtime that will send the data to `fanoutprocessor` which in turn will send the data to the first processor of each pipeline. The data propagation from receiver to `fanoutprocessor` and then to processors is via synchronous function call. This means that if one processor blocks the call the other pipelines that are attached to this receiver will be blocked from receiving the same data and the receiver itself will stop processing and forwarding newly received data.
Important: when the same receiver is referenced in more than one pipeline the Service will create only one receiver instance at runtime that will send the data to `FanOutConnector` which in turn will send the data to the first processor of each pipeline. The data propagation from receiver to `FanOutConnector` and then to processors is via synchronous function call. This means that if one processor blocks the call the other pipelines that are attached to this receiver will be blocked from receiving the same data and the receiver itself will stop processing and forwarding newly received data.
### Exporters
Expand Down
4 changes: 2 additions & 2 deletions exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/open-telemetry/opentelemetry-service/internal/config/viperutils"
"github.com/open-telemetry/opentelemetry-service/internal/testutils"
"github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver/receivertest"
"github.com/open-telemetry/opentelemetry-service/receiver/zipkinreceiver"
)
Expand Down Expand Up @@ -157,7 +157,7 @@ zipkin:
tes[0].(*zipkinExporter).reporter = mzr

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := fanoutprocessor.NewTraceProcessor(tes)
zexp := processor.NewTraceFanOutConnector(tes)
zi, err := zipkinreceiver.New(":0", zexp)
if err != nil {
t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,53 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package fanoutprocessor contains implementations of Trace/Metrics processors
// that fan out to multiple other processors.
package fanoutprocessor
package processor

import (
"context"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/oterr"
"github.com/open-telemetry/opentelemetry-service/processor"
)

// NewMetricsProcessor wraps multiple metrics consumers in a single one.
func NewMetricsProcessor(mcs []consumer.MetricsConsumer) processor.MetricsProcessor {
return metricsConsumers(mcs)
// This file contains implementations of Trace/Metrics connectors
// that fan out the data to multiple other consumers.

// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one.
func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) MetricsProcessor {
return metricsFanOutConnector(mcs)
}

type metricsConsumers []consumer.MetricsConsumer
type metricsFanOutConnector []consumer.MetricsConsumer

var _ processor.MetricsProcessor = (*metricsConsumers)(nil)
var _ MetricsProcessor = (*metricsFanOutConnector)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mcs metricsConsumers) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
var errs []error
for _, mdp := range mcs {
if err := mdp.ConsumeMetricsData(ctx, md); err != nil {
for _, mc := range mfc {
if err := mc.ConsumeMetricsData(ctx, md); err != nil {
errs = append(errs, err)
}
}
return oterr.CombineErrors(errs)
}

// NewTraceProcessor wraps multiple trace consumers in a single one.
func NewTraceProcessor(tcs []consumer.TraceConsumer) processor.TraceProcessor {
return traceConsumers(tcs)
// NewTraceFanOutConnector wraps multiple trace consumers in a single one.
func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) TraceProcessor {
return traceFanOutConnector(tcs)
}

type traceConsumers []consumer.TraceConsumer
type traceFanOutConnector []consumer.TraceConsumer

var _ processor.TraceProcessor = (*traceConsumers)(nil)
var _ TraceProcessor = (*traceFanOutConnector)(nil)

// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
func (tcs traceConsumers) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
var errs []error
for _, tdp := range tcs {
if err := tdp.ConsumeTraceData(ctx, td); err != nil {
for _, tc := range tfc {
if err := tc.ConsumeTraceData(ctx, td); err != nil {
errs = append(errs, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package fanoutprocessor
package processor

import (
"context"
Expand All @@ -31,15 +31,15 @@ func TestTraceProcessorMultiplexing(t *testing.T) {
processors[i] = &mockTraceConsumer{}
}

tdp := NewTraceProcessor(processors)
tfc := NewTraceFanOutConnector(processors)
td := consumerdata.TraceData{
Spans: make([]*tracepb.Span, 7),
}

var wantSpansCount = 0
for i := 0; i < 2; i++ {
wantSpansCount += len(td.Spans)
err := tdp.ConsumeTraceData(context.Background(), td)
err := tfc.ConsumeTraceData(context.Background(), td)
if err != nil {
t.Errorf("Wanted nil got error")
return
Expand All @@ -64,15 +64,15 @@ func TestTraceProcessorWhenOneErrors(t *testing.T) {
// Make one processor return error
processors[1].(*mockTraceConsumer).MustFail = true

tdp := NewTraceProcessor(processors)
tfc := NewTraceFanOutConnector(processors)
td := consumerdata.TraceData{
Spans: make([]*tracepb.Span, 5),
}

var wantSpansCount = 0
for i := 0; i < 2; i++ {
wantSpansCount += len(td.Spans)
err := tdp.ConsumeTraceData(context.Background(), td)
err := tfc.ConsumeTraceData(context.Background(), td)
if err == nil {
t.Errorf("Wanted error got nil")
return
Expand All @@ -94,15 +94,15 @@ func TestMetricsProcessorMultiplexing(t *testing.T) {
processors[i] = &mockMetricsConsumer{}
}

mdp := NewMetricsProcessor(processors)
mfc := NewMetricsFanOutConnector(processors)
md := consumerdata.MetricsData{
Metrics: make([]*metricspb.Metric, 7),
}

var wantMetricsCount = 0
for i := 0; i < 2; i++ {
wantMetricsCount += len(md.Metrics)
err := mdp.ConsumeMetricsData(context.Background(), md)
err := mfc.ConsumeMetricsData(context.Background(), md)
if err != nil {
t.Errorf("Wanted nil got error")
return
Expand All @@ -127,15 +127,15 @@ func TestMetricsProcessorWhenOneErrors(t *testing.T) {
// Make one processor return error
processors[1].(*mockMetricsConsumer).MustFail = true

mdp := NewMetricsProcessor(processors)
mfc := NewMetricsFanOutConnector(processors)
md := consumerdata.MetricsData{
Metrics: make([]*metricspb.Metric, 5),
}

var wantMetricsCount = 0
for i := 0; i < 2; i++ {
wantMetricsCount += len(md.Metrics)
err := mdp.ConsumeMetricsData(context.Background(), md)
err := mfc.ConsumeMetricsData(context.Background(), md)
if err == nil {
t.Errorf("Wanted error got nil")
return
Expand Down
5 changes: 2 additions & 3 deletions service/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor"
)

// builtProcessor is a processor that is built based on a config.
Expand Down Expand Up @@ -147,7 +146,7 @@ func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st
}

// Create a junction point that fans out to all exporters.
return fanoutprocessor.NewTraceProcessor(exporters)
return processor.NewTraceFanOutConnector(exporters)
}

func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer {
Expand All @@ -164,5 +163,5 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []
}

// Create a junction point that fans out to all exporters.
return fanoutprocessor.NewMetricsProcessor(exporters)
return processor.NewMetricsFanOutConnector(exporters)
}
6 changes: 3 additions & 3 deletions service/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/oterr"
"github.com/open-telemetry/opentelemetry-service/processor/fanoutprocessor"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)

Expand Down Expand Up @@ -260,7 +260,7 @@ func buildFanoutTraceConsumer(pipelineFrontProcessors []*builtProcessor) consume
}

// Create a junction point that fans out to all pipelines.
return fanoutprocessor.NewTraceProcessor(pipelineConsumers)
return processor.NewTraceFanOutConnector(pipelineConsumers)
}

func buildFanoutMetricConsumer(pipelineFrontProcessors []*builtProcessor) consumer.MetricsConsumer {
Expand All @@ -275,5 +275,5 @@ func buildFanoutMetricConsumer(pipelineFrontProcessors []*builtProcessor) consum
}

// Create a junction point that fans out to all pipelines.
return fanoutprocessor.NewMetricsProcessor(pipelineConsumers)
return processor.NewMetricsFanOutConnector(pipelineConsumers)
}

0 comments on commit 99ab342

Please sign in to comment.