From e4c19ded11e1efa2ffb940d37c288a0b75f0ab27 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Mon, 8 Jul 2019 15:21:45 -0700 Subject: [PATCH 1/3] Make Zipkin receiver respond to host ingestion status This change makes the Zipkin receiver respond to received data according to the host status regarding ingestion. --- config/configmodels/configmodels.go | 19 +- exporter/zipkinexporter/zipkin_test.go | 3 +- observability/observability.go | 50 ++++++ observability/observability_test.go | 8 + .../observabilitytest/observabilitytest.go | 16 ++ receiver/zipkinreceiver/config_test.go | 9 +- receiver/zipkinreceiver/factory.go | 2 +- receiver/zipkinreceiver/testdata/config.yaml | 1 + receiver/zipkinreceiver/trace_receiver.go | 73 +++++--- .../zipkinreceiver/trace_receiver_test.go | 165 +++++++++++++++++- 10 files changed, 314 insertions(+), 32 deletions(-) diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 8f748d66fc1..56c292efc74 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -126,13 +126,24 @@ type Pipelines map[string]*Pipeline // These are helper structs which you can embed when implementing your specific // receiver/exporter/processor config storage. +// BackPressureState defines if backpressure should be exterted or not. +type BackPressureState bool + +const ( + // EnableBackPressure indicates that backpressure is enabled. + EnableBackPressure BackPressureState = false + // DisableBackPressure indicates that backpressure is disabled. + DisableBackPressure BackPressureState = true +) + // ReceiverSettings defines common settings for a single-protocol receiver configuration. // Specific receivers can embed this struct and extend it with more fields if needed. type ReceiverSettings struct { - TypeVal string `mapstructure:"-"` - NameVal string `mapstructure:"-"` - Enabled bool `mapstructure:"enabled"` - Endpoint string `mapstructure:"endpoint"` + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` + Enabled bool `mapstructure:"enabled"` + Endpoint string `mapstructure:"endpoint"` + BackPressureState BackPressureState `mapstructure:"disable-backpressure"` } // Name gets the receiver name. diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 861a1b781e2..4864b375654 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" @@ -158,7 +159,7 @@ zipkin: // Run the Zipkin receiver to "receive spans upload from a client application" zexp := multiconsumer.NewTraceProcessor(tes) - zi, err := zipkinreceiver.New(":0", zexp) + zi, err := zipkinreceiver.New(":0", configmodels.EnableBackPressure, zexp) if err != nil { t.Fatalf("Failed to create a new Zipkin receiver: %v", err) } diff --git a/observability/observability.go b/observability/observability.go index ad5ec734b9d..9af20ec9d2d 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -31,6 +31,15 @@ import ( ) var ( + mReceiverIngestionBlockedRPCs = stats.Int64( + "oc.io/receiver/ingestion_blocked_rpcs", + "Counts the number of RPCs blocked by the receiver host", + "1") + mReceiverIngestionBlockedRPCsWithDataLoss = stats.Int64( + "oc.io/receiver/ingestion_blocked_silent_data_loss", + "Counts the number of RPCs blocked by the receiver host without back pressure causing data loss", + "1") + mReceiverReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1") mReceiverDroppedSpans = stats.Int64("oc.io/receiver/dropped_spans", "Counts the number of spans dropped by the receiver", "1") @@ -44,6 +53,33 @@ var TagKeyReceiver, _ = tag.NewKey("oc_receiver") // TagKeyExporter defines tag key for Exporter. var TagKeyExporter, _ = tag.NewKey("oc_exporter") +// ViewReceiverIngestionBlockedRPCs defines the view for the receiver ingestion +// blocked metric. If it causes data loss or not depends if back pressure is +// enabled and the client has available resources to buffer and retry. +// The metric used by the view does not use number of spans to avoid requiring +// de-serializing the RPC message. +var ViewReceiverIngestionBlockedRPCs = &view.View{ + Name: mReceiverIngestionBlockedRPCs.Name(), + Description: mReceiverIngestionBlockedRPCs.Description(), + Measure: mReceiverIngestionBlockedRPCs, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TagKeyReceiver}, +} + +// ViewReceiverIngestionBlockedRPCsWithDataLoss defines the view for the receiver +// ingestion blocked without back pressure to the client. Since there is no back +// pressure the client will assume that the data was ingested and there will be +// data loss. +// The metric used by the view does not use number of spans to avoid requiring +// de-serializing the RPC message. +var ViewReceiverIngestionBlockedRPCsWithDataLoss = &view.View{ + Name: mReceiverIngestionBlockedRPCsWithDataLoss.Name(), + Description: mReceiverIngestionBlockedRPCsWithDataLoss.Description(), + Measure: mReceiverIngestionBlockedRPCsWithDataLoss, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TagKeyReceiver}, +} + // ViewReceiverReceivedSpans defines the view for the receiver received spans metric. var ViewReceiverReceivedSpans = &view.View{ Name: mReceiverReceivedSpans.Name(), @@ -82,6 +118,8 @@ var ViewExporterDroppedSpans = &view.View{ // AllViews has the views for the metrics provided by the agent. var AllViews = []*view.View{ + ViewReceiverIngestionBlockedRPCs, + ViewReceiverIngestionBlockedRPCsWithDataLoss, ViewReceiverReceivedSpans, ViewReceiverDroppedSpans, ViewExporterReceivedSpans, @@ -96,6 +134,18 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C return ctx } +// RecordIngestionBlockedMetrics records metrics related to the receiver responses +// when the host blocks ingestion. If back pressure is disabled the metric for +// respective data loss is recorded. +// Use it with a context.Context generated using ContextWithReceiverName(). +func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureDisabled bool) { + if backPressureDisabled { + // In this case data loss will happen, record the proper metric. + stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1)) + } + stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCs.M(1)) +} + // RecordTraceReceiverMetrics records the number of the spans received and dropped by the receiver. // Use it with a context.Context generated using ContextWithReceiverName(). func RecordTraceReceiverMetrics(ctxWithTraceReceiverName context.Context, receivedSpans int, droppedSpans int) { diff --git a/observability/observability_test.go b/observability/observability_test.go index 0963c54031e..cc980a1b15c 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -35,11 +35,19 @@ func TestTracePieplineRecordedMetrics(t *testing.T) { receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName) observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13) + observability.RecordIngestionBlockedMetrics(receiverCtx, false) + observability.RecordIngestionBlockedMetrics(receiverCtx, true) exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName) observability.RecordTraceExporterMetrics(exporterCtx, 27, 23) if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil { t.Fatalf("When check recorded values: want nil got %v", err) } + if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs(receiverName, 2); err != nil { + t.Fatalf("When check recorded values: want nil got %v", err) + } + if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName, 1); err != nil { + t.Fatalf("When check recorded values: want nil got %v", err) + } if err := observabilitytest.CheckValueViewReceiverDroppedSpans(receiverName, 13); err != nil { t.Fatalf("When check recorded values: want nil got %v", err) } diff --git a/observability/observabilitytest/observabilitytest.go b/observability/observabilitytest/observabilitytest.go index b3eabdb6700..907c7b68744 100644 --- a/observability/observabilitytest/observabilitytest.go +++ b/observability/observabilitytest/observabilitytest.go @@ -36,6 +36,22 @@ func SetupRecordedMetricsTest() (doneFn func()) { } } +// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverReceivedSpans +// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". +// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) error { + return checkValueForView(observability.ViewReceiverIngestionBlockedRPCs.Name, + wantsTagsForReceiverView(receiverName), int64(value)) +} + +// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverReceivedSpans +// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". +// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName string, value int) error { + return checkValueForView(observability.ViewReceiverIngestionBlockedRPCsWithDataLoss.Name, + wantsTagsForReceiverView(receiverName), int64(value)) +} + // CheckValueViewExporterReceivedSpans checks that for the current exported value in the ViewExporterReceivedSpans // for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. diff --git a/receiver/zipkinreceiver/config_test.go b/receiver/zipkinreceiver/config_test.go index a0ef334962b..c8b33ab710b 100644 --- a/receiver/zipkinreceiver/config_test.go +++ b/receiver/zipkinreceiver/config_test.go @@ -45,10 +45,11 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: "zipkin/customname", - Endpoint: "127.0.0.1:8765", - Enabled: true, + TypeVal: typeStr, + NameVal: "zipkin/customname", + Endpoint: "127.0.0.1:8765", + Enabled: true, + BackPressureState: configmodels.DisableBackPressure, }, }) } diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index e1afab38577..81f8823e903 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver( ) (receiver.TraceReceiver, error) { rCfg := cfg.(*Config) - return New(rCfg.Endpoint, nextConsumer) + return New(rCfg.Endpoint, configmodels.EnableBackPressure, nextConsumer) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/zipkinreceiver/testdata/config.yaml b/receiver/zipkinreceiver/testdata/config.yaml index bf6e7af9c88..668ee8098a8 100644 --- a/receiver/zipkinreceiver/testdata/config.yaml +++ b/receiver/zipkinreceiver/testdata/config.yaml @@ -3,6 +3,7 @@ receivers: zipkin/customname: endpoint: "127.0.0.1:8765" enabled: true + disable-backpressure: true processors: exampleprocessor: diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index f36d65bb5de..5d46ea2a534 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -17,10 +17,10 @@ package zipkinreceiver import ( "compress/gzip" "compress/zlib" - "context" "encoding/json" "errors" "fmt" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" @@ -58,9 +58,10 @@ type ZipkinReceiver struct { mu sync.Mutex // addr is the address onto which the HTTP server will be bound - addr string - - nextConsumer consumer.TraceConsumer + addr string + host receiver.Host + backPressureState configmodels.BackPressureState + nextConsumer consumer.TraceConsumer startOnce sync.Once stopOnce sync.Once @@ -71,14 +72,15 @@ var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil) var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkinreceiver.ZipkinReceiver reference. -func New(address string, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { +func New(address string, backPressureState configmodels.BackPressureState, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { if nextConsumer == nil { return nil, errNilNextConsumer } zr := &ZipkinReceiver{ - addr: address, - nextConsumer: nextConsumer, + addr: address, + backPressureState: backPressureState, + nextConsumer: nextConsumer, } return zr, nil } @@ -102,6 +104,10 @@ func (zr *ZipkinReceiver) TraceSource() string { // StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing. func (zr *ZipkinReceiver) StartTraceReception(host receiver.Host) error { + if host == nil { + return errors.New("nil host") + } + zr.mu.Lock() defer zr.mu.Unlock() @@ -114,13 +120,13 @@ func (zr *ZipkinReceiver) StartTraceReception(host receiver.Host) error { return } + zr.host = host server := &http.Server{Handler: zr} + zr.server = server go func() { host.ReportFatalError(server.Serve(ln)) }() - zr.server = server - err = nil }) @@ -288,14 +294,49 @@ const ( // unmarshals them and sends them along to the nextConsumer. func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Trace this method - ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export") + parentCtx := r.Context() + ctx, span := trace.StartSpan(parentCtx, "ZipkinReceiver.Export") defer span.End() // If the starting RPC has a parent span, then add it as a parent link. - // TODO: parentCtx should be direct parent for the span created here. - parentCtx := r.Context() observability.SetParentLink(parentCtx, span) + // Now deserialize and process the spans. + asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") + + var receiverTagValue string + if asZipkinv1 { + receiverTagValue = zipkinV1TagValue + } else { + receiverTagValue = zipkinV2TagValue + } + + ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue) + + if !zr.host.OkToIngest() { + var responseStatusCode int + var zPageMessage string + if zr.backPressureState == configmodels.EnableBackPressure { + responseStatusCode = http.StatusServiceUnavailable + zPageMessage = "Host blocked ingestion. Back pressure is ON." + } else { + responseStatusCode = http.StatusAccepted + zPageMessage = "Host blocked ingestion. Back pressure is OFF." + } + + // Internal z-page status does not depend on backpressure setting. + span.SetStatus(trace.Status{ + Code: trace.StatusCodeUnavailable, + Message: zPageMessage, + }) + + observability.RecordIngestionBlockedMetrics( + ctxWithReceiverName, + zr.backPressureState == configmodels.DisableBackPressure) + w.WriteHeader(responseStatusCode) + return + } + pr := processBodyIfNecessary(r) slurp, err := ioutil.ReadAll(pr) if c, ok := pr.(io.Closer); ok { @@ -303,18 +344,11 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } _ = r.Body.Close() - // Now deserialize and process the spans. - asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") - var tds []consumerdata.TraceData - - var receiverTagValue string if asZipkinv1 { tds, err = zr.v1ToTraceSpans(slurp, r.Header) - receiverTagValue = zipkinV1TagValue } else { tds, err = zr.v2ToTraceSpans(slurp, r.Header) - receiverTagValue = zipkinV2TagValue } if err != nil { @@ -326,7 +360,6 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue) tdsSize := 0 for _, td := range tds { td.SourceFormat = "zipkin" diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index a86d20d7e90..2a953bc2756 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -17,12 +17,14 @@ package zipkinreceiver import ( "bytes" "fmt" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" "net/http" "net/http/httptest" "reflect" + "strings" "testing" "time" @@ -32,12 +34,16 @@ import ( openzipkin "github.com/openzipkin/zipkin-go" zipkinmodel "github.com/openzipkin/zipkin-go/model" zhttp "github.com/openzipkin/zipkin-go/reporter/http" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal" "github.com/open-telemetry/opentelemetry-service/internal/testutils" + "github.com/open-telemetry/opentelemetry-service/observability/observabilitytest" + "github.com/open-telemetry/opentelemetry-service/receiver" "github.com/open-telemetry/opentelemetry-service/receiver/receivertest" spandatatranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/spandata" ) @@ -136,7 +142,7 @@ func TestNew(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := New(tt.args.address, tt.args.nextConsumer) + got, err := New(tt.args.address, configmodels.EnableBackPressure, tt.args.nextConsumer) if err != tt.wantErr { t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) return @@ -158,7 +164,7 @@ func TestZipkinReceiverPortAlreadyInUse(t *testing.T) { if err != nil { t.Fatalf("failed to split listener address: %v", err) } - traceReceiver, err := New(":"+portStr, exportertest.NewNopTraceExporter()) + traceReceiver, err := New(":"+portStr, configmodels.EnableBackPressure, exportertest.NewNopTraceExporter()) if err != nil { t.Fatalf("Failed to create receiver: %v", err) } @@ -521,3 +527,158 @@ func TestConversionRoundtrip(t *testing.T) { t.Errorf("The roundtrip JSON doesn't match the JSON that we want\nGot:\n%s\nWant:\n%s", gj, wj) } } + +func TestStartTraceReception(t *testing.T) { + tests := []struct { + name string + host receiver.Host + wantErr bool + }{ + { + name: "nil_host", + wantErr: true, + }, + { + name: "valid_host", + host: receivertest.NewMockHost(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(exportertest.SinkTraceExporter) + zr, err := New("127.0.0.1:0", configmodels.DisableBackPressure, sink) + require.Nil(t, err) + require.NotNil(t, zr) + + if err := zr.StartTraceReception(tt.host); (err != nil) != tt.wantErr { + t.Errorf("StartTraceReception error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr { + require.Nil(t, zr.StopTraceReception()) + } + }) + } +} + +func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { + const jsonZipkinData = `[{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db", + "kind": "CLIENT","name": "get", + "timestamp": 1472470996199000,"duration": 207000, + "localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"}, + "remoteEndpoint": {"serviceName": "backend","ipv4": "192.168.99.101","port": 9000}, + "annotations": [ + {"timestamp": 1472470996238000,"value": "foo"}, + {"timestamp": 1472470996403000,"value": "bar"} +], +"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"} +}]` + type ingestionStateTest struct { + okToIngest bool + expectedHTTPStatus int + } + tests := []struct { + name string + backPressureState configmodels.BackPressureState + expectedReceivedBatches int + expectedIngestionBlockedRPCs int + expectedIngestionBlockedRPCsNoBackPressure int + ingestionStates []ingestionStateTest + }{ + { + name: "EnableBackPressure", + backPressureState: configmodels.EnableBackPressure, + expectedReceivedBatches: 2, + expectedIngestionBlockedRPCs: 1, + expectedIngestionBlockedRPCsNoBackPressure: 0, + ingestionStates: []ingestionStateTest{ + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: false, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + }, + }, + { + name: "DisableBackPressure", + backPressureState: configmodels.DisableBackPressure, + expectedReceivedBatches: 2, + expectedIngestionBlockedRPCs: 1, + expectedIngestionBlockedRPCsNoBackPressure: 1, + ingestionStates: []ingestionStateTest{ + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: false, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + doneFn := observabilitytest.SetupRecordedMetricsTest() + defer doneFn() + + sink := new(exportertest.SinkTraceExporter) + zr, err := New("127.0.0.1:0", tt.backPressureState, sink) + require.Nil(t, err) + require.NotNil(t, zr) + defer zr.StopTraceReception() + + host := receivertest.NewMockHost().(*receivertest.MockHost) + err = zr.StartTraceReception(host) + require.Nil(t, err) + + for _, ingestionState := range tt.ingestionStates { + host.SetOkToIngest(ingestionState.okToIngest) + rw := httptest.NewRecorder() + req := httptest.NewRequest( + "POST", + "https://tld.org/", + strings.NewReader(jsonZipkinData)) + zr.ServeHTTP(rw, req) + assert.Equal(t, ingestionState.expectedHTTPStatus, rw.Code) + } + + require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces())) + require.Nil( + t, + observabilitytest.CheckValueViewReceiverReceivedSpans( + zipkinV2TagValue, + tt.expectedReceivedBatches), + ) + require.Nil( + t, + observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs( + zipkinV2TagValue, + tt.expectedIngestionBlockedRPCs), + ) + + // This view should only have data if ingestion was blocked and there was no back pressure. + err = observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss( + zipkinV2TagValue, + tt.expectedIngestionBlockedRPCsNoBackPressure) + if tt.expectedIngestionBlockedRPCsNoBackPressure == 0 { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + }) + } +} From 53789c0888ffa8493c1e0c7ecf3bf094712cf156 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 10 Jul 2019 12:03:18 -0700 Subject: [PATCH 2/3] Nit cleanup --- exporter/zipkinexporter/zipkin_test.go | 2 +- observability/observabilitytest/observabilitytest.go | 4 ++-- receiver/zipkinreceiver/trace_receiver.go | 2 +- receiver/zipkinreceiver/trace_receiver_test.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 4864b375654..294abcf31e6 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" @@ -32,6 +31,7 @@ import ( zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinreporter "github.com/openzipkin/zipkin-go/reporter" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" "github.com/open-telemetry/opentelemetry-service/internal/testutils" "github.com/open-telemetry/opentelemetry-service/processor/multiconsumer" diff --git a/observability/observabilitytest/observabilitytest.go b/observability/observabilitytest/observabilitytest.go index 907c7b68744..41e654c9463 100644 --- a/observability/observabilitytest/observabilitytest.go +++ b/observability/observabilitytest/observabilitytest.go @@ -36,7 +36,7 @@ func SetupRecordedMetricsTest() (doneFn func()) { } } -// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverReceivedSpans +// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverIngestionBlockedRPCs // for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". // In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) error { @@ -44,7 +44,7 @@ func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) wantsTagsForReceiverView(receiverName), int64(value)) } -// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverReceivedSpans +// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverIngestionBlockedRPCsWithDataLoss // for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". // In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName string, value int) error { diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index 5d46ea2a534..a8045cc9ff3 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" @@ -37,6 +36,7 @@ import ( zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" "go.opencensus.io/trace" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/internal" diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 2a953bc2756..828c28a424d 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -17,7 +17,6 @@ package zipkinreceiver import ( "bytes" "fmt" - "github.com/open-telemetry/opentelemetry-service/config/configmodels" "io" "io/ioutil" "net" @@ -37,6 +36,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" From 47e082615468b176ec46b41f034cce5032448804 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 10 Jul 2019 20:56:52 -0700 Subject: [PATCH 3/3] Use better type to handle back pressure setting --- config/configmodels/configmodels.go | 26 ++++++++++++------- observability/observability.go | 6 +++-- observability/observability_test.go | 5 ++-- receiver/zipkinreceiver/config_test.go | 10 +++---- receiver/zipkinreceiver/factory.go | 2 +- receiver/zipkinreceiver/factory_test.go | 9 +++++++ receiver/zipkinreceiver/trace_receiver.go | 20 +++++++------- .../zipkinreceiver/trace_receiver_test.go | 8 +++--- 8 files changed, 53 insertions(+), 33 deletions(-) diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 56c292efc74..6e69c4771b4 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -126,24 +126,24 @@ type Pipelines map[string]*Pipeline // These are helper structs which you can embed when implementing your specific // receiver/exporter/processor config storage. -// BackPressureState defines if backpressure should be exterted or not. -type BackPressureState bool +// BackPressureSetting defines if back pressure should be exerted or not. +type BackPressureSetting int const ( // EnableBackPressure indicates that backpressure is enabled. - EnableBackPressure BackPressureState = false + EnableBackPressure BackPressureSetting = iota // DisableBackPressure indicates that backpressure is disabled. - DisableBackPressure BackPressureState = true + DisableBackPressure ) // ReceiverSettings defines common settings for a single-protocol receiver configuration. // Specific receivers can embed this struct and extend it with more fields if needed. type ReceiverSettings struct { - TypeVal string `mapstructure:"-"` - NameVal string `mapstructure:"-"` - Enabled bool `mapstructure:"enabled"` - Endpoint string `mapstructure:"endpoint"` - BackPressureState BackPressureState `mapstructure:"disable-backpressure"` + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` + Enabled bool `mapstructure:"enabled"` + Endpoint string `mapstructure:"endpoint"` + DisableBackPressure bool `mapstructure:"disable-backpressure"` } // Name gets the receiver name. @@ -166,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) { rs.TypeVal = typeStr } +// BackPressureSetting gets the back pressure setting of the configuration. +func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting { + if rs.DisableBackPressure { + return DisableBackPressure + } + return EnableBackPressure +} + // ExporterSettings defines common settings for an exporter configuration. // Specific exporters can embed this struct and extend it with more fields if needed. type ExporterSettings struct { diff --git a/observability/observability.go b/observability/observability.go index 9af20ec9d2d..ba2285bd565 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -28,6 +28,8 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opencensus.io/trace" + + "github.com/open-telemetry/opentelemetry-service/config/configmodels" ) var ( @@ -138,8 +140,8 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C // when the host blocks ingestion. If back pressure is disabled the metric for // respective data loss is recorded. // Use it with a context.Context generated using ContextWithReceiverName(). -func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureDisabled bool) { - if backPressureDisabled { +func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) { + if backPressureSetting == configmodels.DisableBackPressure { // In this case data loss will happen, record the proper metric. stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1)) } diff --git a/observability/observability_test.go b/observability/observability_test.go index cc980a1b15c..7ec2254f5e2 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/observability" "github.com/open-telemetry/opentelemetry-service/observability/observabilitytest" ) @@ -35,8 +36,8 @@ func TestTracePieplineRecordedMetrics(t *testing.T) { receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName) observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13) - observability.RecordIngestionBlockedMetrics(receiverCtx, false) - observability.RecordIngestionBlockedMetrics(receiverCtx, true) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure) exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName) observability.RecordTraceExporterMetrics(exporterCtx, 27, 23) if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil { diff --git a/receiver/zipkinreceiver/config_test.go b/receiver/zipkinreceiver/config_test.go index c8b33ab710b..92761e77180 100644 --- a/receiver/zipkinreceiver/config_test.go +++ b/receiver/zipkinreceiver/config_test.go @@ -45,11 +45,11 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: "zipkin/customname", - Endpoint: "127.0.0.1:8765", - Enabled: true, - BackPressureState: configmodels.DisableBackPressure, + TypeVal: typeStr, + NameVal: "zipkin/customname", + Endpoint: "127.0.0.1:8765", + Enabled: true, + DisableBackPressure: true, }, }) } diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index 81f8823e903..d5bf35766ed 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver( ) (receiver.TraceReceiver, error) { rCfg := cfg.(*Config) - return New(rCfg.Endpoint, configmodels.EnableBackPressure, nextConsumer) + return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/zipkinreceiver/factory_test.go b/receiver/zipkinreceiver/factory_test.go index 1ddd161be0b..bbdb4daff60 100644 --- a/receiver/zipkinreceiver/factory_test.go +++ b/receiver/zipkinreceiver/factory_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/open-telemetry/opentelemetry-service/config/configerror" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/receiver" ) @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) { tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) assert.Nil(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) + + rCfg := cfg.(*Config) + rCfg.DisableBackPressure = true + tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) + assert.Nil(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index a8045cc9ff3..ce2fb8f0f14 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -58,10 +58,10 @@ type ZipkinReceiver struct { mu sync.Mutex // addr is the address onto which the HTTP server will be bound - addr string - host receiver.Host - backPressureState configmodels.BackPressureState - nextConsumer consumer.TraceConsumer + addr string + host receiver.Host + backPressureSetting configmodels.BackPressureSetting + nextConsumer consumer.TraceConsumer startOnce sync.Once stopOnce sync.Once @@ -72,15 +72,15 @@ var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil) var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkinreceiver.ZipkinReceiver reference. -func New(address string, backPressureState configmodels.BackPressureState, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { +func New(address string, backPressureSetting configmodels.BackPressureSetting, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { if nextConsumer == nil { return nil, errNilNextConsumer } zr := &ZipkinReceiver{ - addr: address, - backPressureState: backPressureState, - nextConsumer: nextConsumer, + addr: address, + backPressureSetting: backPressureSetting, + nextConsumer: nextConsumer, } return zr, nil } @@ -316,7 +316,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !zr.host.OkToIngest() { var responseStatusCode int var zPageMessage string - if zr.backPressureState == configmodels.EnableBackPressure { + if zr.backPressureSetting == configmodels.EnableBackPressure { responseStatusCode = http.StatusServiceUnavailable zPageMessage = "Host blocked ingestion. Back pressure is ON." } else { @@ -332,7 +332,7 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { observability.RecordIngestionBlockedMetrics( ctxWithReceiverName, - zr.backPressureState == configmodels.DisableBackPressure) + zr.backPressureSetting) w.WriteHeader(responseStatusCode) return } diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 828c28a424d..0276791d1dc 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -580,7 +580,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { } tests := []struct { name string - backPressureState configmodels.BackPressureState + backPressureSetting configmodels.BackPressureSetting expectedReceivedBatches int expectedIngestionBlockedRPCs int expectedIngestionBlockedRPCsNoBackPressure int @@ -588,7 +588,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { }{ { name: "EnableBackPressure", - backPressureState: configmodels.EnableBackPressure, + backPressureSetting: configmodels.EnableBackPressure, expectedReceivedBatches: 2, expectedIngestionBlockedRPCs: 1, expectedIngestionBlockedRPCsNoBackPressure: 0, @@ -609,7 +609,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { }, { name: "DisableBackPressure", - backPressureState: configmodels.DisableBackPressure, + backPressureSetting: configmodels.DisableBackPressure, expectedReceivedBatches: 2, expectedIngestionBlockedRPCs: 1, expectedIngestionBlockedRPCsNoBackPressure: 1, @@ -636,7 +636,7 @@ func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { defer doneFn() sink := new(exportertest.SinkTraceExporter) - zr, err := New("127.0.0.1:0", tt.backPressureState, sink) + zr, err := New("127.0.0.1:0", tt.backPressureSetting, sink) require.Nil(t, err) require.NotNil(t, zr) defer zr.StopTraceReception()