Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Zipkin receiver behavior according to host ingestion status #148

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions config/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,24 @@ type Pipelines map[string]*Pipeline
// These are helper structs which you can embed when implementing your specific
// receiver/exporter/processor config storage.

// BackPressureSetting defines if back pressure should be exerted or not.
type BackPressureSetting int

const (
// EnableBackPressure indicates that backpressure is enabled.
EnableBackPressure BackPressureSetting = iota
// DisableBackPressure indicates that backpressure is disabled.
DisableBackPressure
)

// ReceiverSettings defines common settings for a single-protocol receiver configuration.
// Specific receivers can embed this struct and extend it with more fields if needed.
type ReceiverSettings struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Enabled bool `mapstructure:"enabled"`
Endpoint string `mapstructure:"endpoint"`
DisableBackPressure bool `mapstructure:"disable-backpressure"`
}

// Name gets the receiver name.
Expand All @@ -155,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) {
rs.TypeVal = typeStr
}

// BackPressureSetting gets the back pressure setting of the configuration.
func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting {
if rs.DisableBackPressure {
return DisableBackPressure
}
return EnableBackPressure
}

// ExporterSettings defines common settings for an exporter configuration.
// Specific exporters can embed this struct and extend it with more fields if needed.
type ExporterSettings struct {
Expand Down
3 changes: 2 additions & 1 deletion exporter/zipkinexporter/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinreporter "github.com/openzipkin/zipkin-go/reporter"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/internal/config/viperutils"
"github.com/open-telemetry/opentelemetry-service/internal/testutils"
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
Expand Down Expand Up @@ -158,7 +159,7 @@ zipkin:

// Run the Zipkin receiver to "receive spans upload from a client application"
zexp := multiconsumer.NewTraceProcessor(tes)
zi, err := zipkinreceiver.New(":0", zexp)
zi, err := zipkinreceiver.New(":0", configmodels.EnableBackPressure, zexp)
if err != nil {
t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
}
Expand Down
52 changes: 52 additions & 0 deletions observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

var (
mReceiverIngestionBlockedRPCs = stats.Int64(
"oc.io/receiver/ingestion_blocked_rpcs",
"Counts the number of RPCs blocked by the receiver host",
"1")
mReceiverIngestionBlockedRPCsWithDataLoss = stats.Int64(
"oc.io/receiver/ingestion_blocked_silent_data_loss",
"Counts the number of RPCs blocked by the receiver host without back pressure causing data loss",
"1")

mReceiverReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1")
mReceiverDroppedSpans = stats.Int64("oc.io/receiver/dropped_spans", "Counts the number of spans dropped by the receiver", "1")

Expand All @@ -44,6 +55,33 @@ var TagKeyReceiver, _ = tag.NewKey("oc_receiver")
// TagKeyExporter defines tag key for Exporter.
var TagKeyExporter, _ = tag.NewKey("oc_exporter")

// ViewReceiverIngestionBlockedRPCs defines the view for the receiver ingestion
// blocked metric. If it causes data loss or not depends if back pressure is
// enabled and the client has available resources to buffer and retry.
// The metric used by the view does not use number of spans to avoid requiring
// de-serializing the RPC message.
var ViewReceiverIngestionBlockedRPCs = &view.View{
Name: mReceiverIngestionBlockedRPCs.Name(),
Description: mReceiverIngestionBlockedRPCs.Description(),
Measure: mReceiverIngestionBlockedRPCs,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TagKeyReceiver},
}

// ViewReceiverIngestionBlockedRPCsWithDataLoss defines the view for the receiver
// ingestion blocked without back pressure to the client. Since there is no back
// pressure the client will assume that the data was ingested and there will be
// data loss.
// The metric used by the view does not use number of spans to avoid requiring
// de-serializing the RPC message.
var ViewReceiverIngestionBlockedRPCsWithDataLoss = &view.View{
Name: mReceiverIngestionBlockedRPCsWithDataLoss.Name(),
Description: mReceiverIngestionBlockedRPCsWithDataLoss.Description(),
Measure: mReceiverIngestionBlockedRPCsWithDataLoss,
Aggregation: view.Sum(),
TagKeys: []tag.Key{TagKeyReceiver},
}

// ViewReceiverReceivedSpans defines the view for the receiver received spans metric.
var ViewReceiverReceivedSpans = &view.View{
Name: mReceiverReceivedSpans.Name(),
Expand Down Expand Up @@ -82,6 +120,8 @@ var ViewExporterDroppedSpans = &view.View{

// AllViews has the views for the metrics provided by the agent.
var AllViews = []*view.View{
ViewReceiverIngestionBlockedRPCs,
ViewReceiverIngestionBlockedRPCsWithDataLoss,
ViewReceiverReceivedSpans,
ViewReceiverDroppedSpans,
ViewExporterReceivedSpans,
Expand All @@ -96,6 +136,18 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C
return ctx
}

// RecordIngestionBlockedMetrics records metrics related to the receiver responses
// when the host blocks ingestion. If back pressure is disabled the metric for
// respective data loss is recorded.
// Use it with a context.Context generated using ContextWithReceiverName().
func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) {
if backPressureSetting == configmodels.DisableBackPressure {
// In this case data loss will happen, record the proper metric.
stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1))
}
stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCs.M(1))
}

// RecordTraceReceiverMetrics records the number of the spans received and dropped by the receiver.
// Use it with a context.Context generated using ContextWithReceiverName().
func RecordTraceReceiverMetrics(ctxWithTraceReceiverName context.Context, receivedSpans int, droppedSpans int) {
Expand Down
9 changes: 9 additions & 0 deletions observability/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/observability/observabilitytest"
)
Expand All @@ -35,11 +36,19 @@ func TestTracePieplineRecordedMetrics(t *testing.T) {

receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName)
observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure)
observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure)
exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName)
observability.RecordTraceExporterMetrics(exporterCtx, 27, 23)
if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs(receiverName, 2); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName, 1); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
if err := observabilitytest.CheckValueViewReceiverDroppedSpans(receiverName, 13); err != nil {
t.Fatalf("When check recorded values: want nil got %v", err)
}
Expand Down
16 changes: 16 additions & 0 deletions observability/observabilitytest/observabilitytest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func SetupRecordedMetricsTest() (doneFn func()) {
}
}

// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverIngestionBlockedRPCs
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) error {
return checkValueForView(observability.ViewReceiverIngestionBlockedRPCs.Name,
wantsTagsForReceiverView(receiverName), int64(value))
}

// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverIngestionBlockedRPCsWithDataLoss
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName string, value int) error {
return checkValueForView(observability.ViewReceiverIngestionBlockedRPCsWithDataLoss.Name,
wantsTagsForReceiverView(receiverName), int64(value))
}

// CheckValueViewExporterReceivedSpans checks that for the current exported value in the ViewExporterReceivedSpans
// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value".
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
Expand Down
9 changes: 5 additions & 4 deletions receiver/zipkinreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, r1,
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
TypeVal: typeStr,
NameVal: "zipkin/customname",
Endpoint: "127.0.0.1:8765",
Enabled: true,
DisableBackPressure: true,
},
})
}
2 changes: 1 addition & 1 deletion receiver/zipkinreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver(
) (receiver.TraceReceiver, error) {

rCfg := cfg.(*Config)
return New(rCfg.Endpoint, nextConsumer)
return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer)
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
Expand Down
9 changes: 9 additions & 0 deletions receiver/zipkinreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-service/config/configerror"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
Expand All @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) {
tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

rCfg := cfg.(*Config)
rCfg.DisableBackPressure = true
tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{})
assert.Nil(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")
assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting)

mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil)
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
Expand Down
1 change: 1 addition & 0 deletions receiver/zipkinreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ receivers:
zipkin/customname:
endpoint: "127.0.0.1:8765"
enabled: true
disable-backpressure: true

processors:
exampleprocessor:
Expand Down
Loading