diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 8f748d66fc1..6e69c4771b4 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -126,13 +126,24 @@ type Pipelines map[string]*Pipeline // These are helper structs which you can embed when implementing your specific // receiver/exporter/processor config storage. +// BackPressureSetting defines if back pressure should be exerted or not. +type BackPressureSetting int + +const ( + // EnableBackPressure indicates that backpressure is enabled. + EnableBackPressure BackPressureSetting = iota + // DisableBackPressure indicates that backpressure is disabled. + DisableBackPressure +) + // ReceiverSettings defines common settings for a single-protocol receiver configuration. // Specific receivers can embed this struct and extend it with more fields if needed. type ReceiverSettings struct { - TypeVal string `mapstructure:"-"` - NameVal string `mapstructure:"-"` - Enabled bool `mapstructure:"enabled"` - Endpoint string `mapstructure:"endpoint"` + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` + Enabled bool `mapstructure:"enabled"` + Endpoint string `mapstructure:"endpoint"` + DisableBackPressure bool `mapstructure:"disable-backpressure"` } // Name gets the receiver name. @@ -155,6 +166,14 @@ func (rs *ReceiverSettings) SetType(typeStr string) { rs.TypeVal = typeStr } +// BackPressureSetting gets the back pressure setting of the configuration. +func (rs *ReceiverSettings) BackPressureSetting() BackPressureSetting { + if rs.DisableBackPressure { + return DisableBackPressure + } + return EnableBackPressure +} + // ExporterSettings defines common settings for an exporter configuration. // Specific exporters can embed this struct and extend it with more fields if needed. type ExporterSettings struct { diff --git a/exporter/zipkinexporter/zipkin_test.go b/exporter/zipkinexporter/zipkin_test.go index 861a1b781e2..294abcf31e6 100644 --- a/exporter/zipkinexporter/zipkin_test.go +++ b/exporter/zipkinexporter/zipkin_test.go @@ -31,6 +31,7 @@ import ( zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinreporter "github.com/openzipkin/zipkin-go/reporter" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/internal/config/viperutils" "github.com/open-telemetry/opentelemetry-service/internal/testutils" "github.com/open-telemetry/opentelemetry-service/processor/multiconsumer" @@ -158,7 +159,7 @@ zipkin: // Run the Zipkin receiver to "receive spans upload from a client application" zexp := multiconsumer.NewTraceProcessor(tes) - zi, err := zipkinreceiver.New(":0", zexp) + zi, err := zipkinreceiver.New(":0", configmodels.EnableBackPressure, zexp) if err != nil { t.Fatalf("Failed to create a new Zipkin receiver: %v", err) } diff --git a/observability/observability.go b/observability/observability.go index ad5ec734b9d..ba2285bd565 100644 --- a/observability/observability.go +++ b/observability/observability.go @@ -28,9 +28,20 @@ import ( "go.opencensus.io/stats/view" "go.opencensus.io/tag" "go.opencensus.io/trace" + + "github.com/open-telemetry/opentelemetry-service/config/configmodels" ) var ( + mReceiverIngestionBlockedRPCs = stats.Int64( + "oc.io/receiver/ingestion_blocked_rpcs", + "Counts the number of RPCs blocked by the receiver host", + "1") + mReceiverIngestionBlockedRPCsWithDataLoss = stats.Int64( + "oc.io/receiver/ingestion_blocked_silent_data_loss", + "Counts the number of RPCs blocked by the receiver host without back pressure causing data loss", + "1") + mReceiverReceivedSpans = stats.Int64("oc.io/receiver/received_spans", "Counts the number of spans received by the receiver", "1") mReceiverDroppedSpans = stats.Int64("oc.io/receiver/dropped_spans", "Counts the number of spans dropped by the receiver", "1") @@ -44,6 +55,33 @@ var TagKeyReceiver, _ = tag.NewKey("oc_receiver") // TagKeyExporter defines tag key for Exporter. var TagKeyExporter, _ = tag.NewKey("oc_exporter") +// ViewReceiverIngestionBlockedRPCs defines the view for the receiver ingestion +// blocked metric. If it causes data loss or not depends if back pressure is +// enabled and the client has available resources to buffer and retry. +// The metric used by the view does not use number of spans to avoid requiring +// de-serializing the RPC message. +var ViewReceiverIngestionBlockedRPCs = &view.View{ + Name: mReceiverIngestionBlockedRPCs.Name(), + Description: mReceiverIngestionBlockedRPCs.Description(), + Measure: mReceiverIngestionBlockedRPCs, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TagKeyReceiver}, +} + +// ViewReceiverIngestionBlockedRPCsWithDataLoss defines the view for the receiver +// ingestion blocked without back pressure to the client. Since there is no back +// pressure the client will assume that the data was ingested and there will be +// data loss. +// The metric used by the view does not use number of spans to avoid requiring +// de-serializing the RPC message. +var ViewReceiverIngestionBlockedRPCsWithDataLoss = &view.View{ + Name: mReceiverIngestionBlockedRPCsWithDataLoss.Name(), + Description: mReceiverIngestionBlockedRPCsWithDataLoss.Description(), + Measure: mReceiverIngestionBlockedRPCsWithDataLoss, + Aggregation: view.Sum(), + TagKeys: []tag.Key{TagKeyReceiver}, +} + // ViewReceiverReceivedSpans defines the view for the receiver received spans metric. var ViewReceiverReceivedSpans = &view.View{ Name: mReceiverReceivedSpans.Name(), @@ -82,6 +120,8 @@ var ViewExporterDroppedSpans = &view.View{ // AllViews has the views for the metrics provided by the agent. var AllViews = []*view.View{ + ViewReceiverIngestionBlockedRPCs, + ViewReceiverIngestionBlockedRPCsWithDataLoss, ViewReceiverReceivedSpans, ViewReceiverDroppedSpans, ViewExporterReceivedSpans, @@ -96,6 +136,18 @@ func ContextWithReceiverName(ctx context.Context, receiverName string) context.C return ctx } +// RecordIngestionBlockedMetrics records metrics related to the receiver responses +// when the host blocks ingestion. If back pressure is disabled the metric for +// respective data loss is recorded. +// Use it with a context.Context generated using ContextWithReceiverName(). +func RecordIngestionBlockedMetrics(ctxWithTraceReceiverName context.Context, backPressureSetting configmodels.BackPressureSetting) { + if backPressureSetting == configmodels.DisableBackPressure { + // In this case data loss will happen, record the proper metric. + stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCsWithDataLoss.M(1)) + } + stats.Record(ctxWithTraceReceiverName, mReceiverIngestionBlockedRPCs.M(1)) +} + // RecordTraceReceiverMetrics records the number of the spans received and dropped by the receiver. // Use it with a context.Context generated using ContextWithReceiverName(). func RecordTraceReceiverMetrics(ctxWithTraceReceiverName context.Context, receivedSpans int, droppedSpans int) { diff --git a/observability/observability_test.go b/observability/observability_test.go index 0963c54031e..7ec2254f5e2 100644 --- a/observability/observability_test.go +++ b/observability/observability_test.go @@ -20,6 +20,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/observability" "github.com/open-telemetry/opentelemetry-service/observability/observabilitytest" ) @@ -35,11 +36,19 @@ func TestTracePieplineRecordedMetrics(t *testing.T) { receiverCtx := observability.ContextWithReceiverName(context.Background(), receiverName) observability.RecordTraceReceiverMetrics(receiverCtx, 17, 13) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.EnableBackPressure) + observability.RecordIngestionBlockedMetrics(receiverCtx, configmodels.DisableBackPressure) exporterCtx := observability.ContextWithExporterName(receiverCtx, exporterName) observability.RecordTraceExporterMetrics(exporterCtx, 27, 23) if err := observabilitytest.CheckValueViewReceiverReceivedSpans(receiverName, 17); err != nil { t.Fatalf("When check recorded values: want nil got %v", err) } + if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs(receiverName, 2); err != nil { + t.Fatalf("When check recorded values: want nil got %v", err) + } + if err := observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName, 1); err != nil { + t.Fatalf("When check recorded values: want nil got %v", err) + } if err := observabilitytest.CheckValueViewReceiverDroppedSpans(receiverName, 13); err != nil { t.Fatalf("When check recorded values: want nil got %v", err) } diff --git a/observability/observabilitytest/observabilitytest.go b/observability/observabilitytest/observabilitytest.go index b3eabdb6700..41e654c9463 100644 --- a/observability/observabilitytest/observabilitytest.go +++ b/observability/observabilitytest/observabilitytest.go @@ -36,6 +36,22 @@ func SetupRecordedMetricsTest() (doneFn func()) { } } +// CheckValueViewReceiverIngestionBlockedRPCs checks that for the current exported value in the ViewReceiverIngestionBlockedRPCs +// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". +// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckValueViewReceiverIngestionBlockedRPCs(receiverName string, value int) error { + return checkValueForView(observability.ViewReceiverIngestionBlockedRPCs.Name, + wantsTagsForReceiverView(receiverName), int64(value)) +} + +// CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss checks that for the current exported value in the ViewReceiverIngestionBlockedRPCsWithDataLoss +// for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". +// In tests that this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss(receiverName string, value int) error { + return checkValueForView(observability.ViewReceiverIngestionBlockedRPCsWithDataLoss.Name, + wantsTagsForReceiverView(receiverName), int64(value)) +} + // CheckValueViewExporterReceivedSpans checks that for the current exported value in the ViewExporterReceivedSpans // for {TagKeyReceiver: receiverName, TagKeyExporter: exporterTagName} is equal to "value". // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. diff --git a/receiver/zipkinreceiver/config_test.go b/receiver/zipkinreceiver/config_test.go index a0ef334962b..92761e77180 100644 --- a/receiver/zipkinreceiver/config_test.go +++ b/receiver/zipkinreceiver/config_test.go @@ -45,10 +45,11 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, r1, &Config{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: "zipkin/customname", - Endpoint: "127.0.0.1:8765", - Enabled: true, + TypeVal: typeStr, + NameVal: "zipkin/customname", + Endpoint: "127.0.0.1:8765", + Enabled: true, + DisableBackPressure: true, }, }) } diff --git a/receiver/zipkinreceiver/factory.go b/receiver/zipkinreceiver/factory.go index e1afab38577..d5bf35766ed 100644 --- a/receiver/zipkinreceiver/factory.go +++ b/receiver/zipkinreceiver/factory.go @@ -70,7 +70,7 @@ func (f *factory) CreateTraceReceiver( ) (receiver.TraceReceiver, error) { rCfg := cfg.(*Config) - return New(rCfg.Endpoint, nextConsumer) + return New(rCfg.Endpoint, rCfg.BackPressureSetting(), nextConsumer) } // CreateMetricsReceiver creates a metrics receiver based on provided config. diff --git a/receiver/zipkinreceiver/factory_test.go b/receiver/zipkinreceiver/factory_test.go index 1ddd161be0b..bbdb4daff60 100644 --- a/receiver/zipkinreceiver/factory_test.go +++ b/receiver/zipkinreceiver/factory_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/open-telemetry/opentelemetry-service/config/configerror" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/receiver" ) @@ -47,6 +48,14 @@ func TestCreateReceiver(t *testing.T) { tReceiver, err := factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) assert.Nil(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.EnableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) + + rCfg := cfg.(*Config) + rCfg.DisableBackPressure = true + tReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, &mockTraceConsumer{}) + assert.Nil(t, err, "receiver creation failed") + assert.NotNil(t, tReceiver, "receiver creation failed") + assert.Equal(t, configmodels.DisableBackPressure, tReceiver.(*ZipkinReceiver).backPressureSetting) mReceiver, err := factory.CreateMetricsReceiver(zap.NewNop(), cfg, nil) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) diff --git a/receiver/zipkinreceiver/testdata/config.yaml b/receiver/zipkinreceiver/testdata/config.yaml index bf6e7af9c88..668ee8098a8 100644 --- a/receiver/zipkinreceiver/testdata/config.yaml +++ b/receiver/zipkinreceiver/testdata/config.yaml @@ -3,6 +3,7 @@ receivers: zipkin/customname: endpoint: "127.0.0.1:8765" enabled: true + disable-backpressure: true processors: exampleprocessor: diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index f36d65bb5de..ce2fb8f0f14 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -17,7 +17,6 @@ package zipkinreceiver import ( "compress/gzip" "compress/zlib" - "context" "encoding/json" "errors" "fmt" @@ -37,6 +36,7 @@ import ( zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" "go.opencensus.io/trace" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/internal" @@ -58,9 +58,10 @@ type ZipkinReceiver struct { mu sync.Mutex // addr is the address onto which the HTTP server will be bound - addr string - - nextConsumer consumer.TraceConsumer + addr string + host receiver.Host + backPressureSetting configmodels.BackPressureSetting + nextConsumer consumer.TraceConsumer startOnce sync.Once stopOnce sync.Once @@ -71,14 +72,15 @@ var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil) var _ http.Handler = (*ZipkinReceiver)(nil) // New creates a new zipkinreceiver.ZipkinReceiver reference. -func New(address string, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { +func New(address string, backPressureSetting configmodels.BackPressureSetting, nextConsumer consumer.TraceConsumer) (*ZipkinReceiver, error) { if nextConsumer == nil { return nil, errNilNextConsumer } zr := &ZipkinReceiver{ - addr: address, - nextConsumer: nextConsumer, + addr: address, + backPressureSetting: backPressureSetting, + nextConsumer: nextConsumer, } return zr, nil } @@ -102,6 +104,10 @@ func (zr *ZipkinReceiver) TraceSource() string { // StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing. func (zr *ZipkinReceiver) StartTraceReception(host receiver.Host) error { + if host == nil { + return errors.New("nil host") + } + zr.mu.Lock() defer zr.mu.Unlock() @@ -114,13 +120,13 @@ func (zr *ZipkinReceiver) StartTraceReception(host receiver.Host) error { return } + zr.host = host server := &http.Server{Handler: zr} + zr.server = server go func() { host.ReportFatalError(server.Serve(ln)) }() - zr.server = server - err = nil }) @@ -288,14 +294,49 @@ const ( // unmarshals them and sends them along to the nextConsumer. func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Trace this method - ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export") + parentCtx := r.Context() + ctx, span := trace.StartSpan(parentCtx, "ZipkinReceiver.Export") defer span.End() // If the starting RPC has a parent span, then add it as a parent link. - // TODO: parentCtx should be direct parent for the span created here. - parentCtx := r.Context() observability.SetParentLink(parentCtx, span) + // Now deserialize and process the spans. + asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") + + var receiverTagValue string + if asZipkinv1 { + receiverTagValue = zipkinV1TagValue + } else { + receiverTagValue = zipkinV2TagValue + } + + ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue) + + if !zr.host.OkToIngest() { + var responseStatusCode int + var zPageMessage string + if zr.backPressureSetting == configmodels.EnableBackPressure { + responseStatusCode = http.StatusServiceUnavailable + zPageMessage = "Host blocked ingestion. Back pressure is ON." + } else { + responseStatusCode = http.StatusAccepted + zPageMessage = "Host blocked ingestion. Back pressure is OFF." + } + + // Internal z-page status does not depend on backpressure setting. + span.SetStatus(trace.Status{ + Code: trace.StatusCodeUnavailable, + Message: zPageMessage, + }) + + observability.RecordIngestionBlockedMetrics( + ctxWithReceiverName, + zr.backPressureSetting) + w.WriteHeader(responseStatusCode) + return + } + pr := processBodyIfNecessary(r) slurp, err := ioutil.ReadAll(pr) if c, ok := pr.(io.Closer); ok { @@ -303,18 +344,11 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } _ = r.Body.Close() - // Now deserialize and process the spans. - asZipkinv1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") - var tds []consumerdata.TraceData - - var receiverTagValue string if asZipkinv1 { tds, err = zr.v1ToTraceSpans(slurp, r.Header) - receiverTagValue = zipkinV1TagValue } else { tds, err = zr.v2ToTraceSpans(slurp, r.Header) - receiverTagValue = zipkinV2TagValue } if err != nil { @@ -326,7 +360,6 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - ctxWithReceiverName := observability.ContextWithReceiverName(ctx, receiverTagValue) tdsSize := 0 for _, td := range tds { td.SourceFormat = "zipkin" diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index a86d20d7e90..0276791d1dc 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "strings" "testing" "time" @@ -32,12 +33,17 @@ import ( openzipkin "github.com/openzipkin/zipkin-go" zipkinmodel "github.com/openzipkin/zipkin-go/model" zhttp "github.com/openzipkin/zipkin-go/reporter/http" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal" "github.com/open-telemetry/opentelemetry-service/internal/testutils" + "github.com/open-telemetry/opentelemetry-service/observability/observabilitytest" + "github.com/open-telemetry/opentelemetry-service/receiver" "github.com/open-telemetry/opentelemetry-service/receiver/receivertest" spandatatranslator "github.com/open-telemetry/opentelemetry-service/translator/trace/spandata" ) @@ -136,7 +142,7 @@ func TestNew(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := New(tt.args.address, tt.args.nextConsumer) + got, err := New(tt.args.address, configmodels.EnableBackPressure, tt.args.nextConsumer) if err != tt.wantErr { t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) return @@ -158,7 +164,7 @@ func TestZipkinReceiverPortAlreadyInUse(t *testing.T) { if err != nil { t.Fatalf("failed to split listener address: %v", err) } - traceReceiver, err := New(":"+portStr, exportertest.NewNopTraceExporter()) + traceReceiver, err := New(":"+portStr, configmodels.EnableBackPressure, exportertest.NewNopTraceExporter()) if err != nil { t.Fatalf("Failed to create receiver: %v", err) } @@ -521,3 +527,158 @@ func TestConversionRoundtrip(t *testing.T) { t.Errorf("The roundtrip JSON doesn't match the JSON that we want\nGot:\n%s\nWant:\n%s", gj, wj) } } + +func TestStartTraceReception(t *testing.T) { + tests := []struct { + name string + host receiver.Host + wantErr bool + }{ + { + name: "nil_host", + wantErr: true, + }, + { + name: "valid_host", + host: receivertest.NewMockHost(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sink := new(exportertest.SinkTraceExporter) + zr, err := New("127.0.0.1:0", configmodels.DisableBackPressure, sink) + require.Nil(t, err) + require.NotNil(t, zr) + + if err := zr.StartTraceReception(tt.host); (err != nil) != tt.wantErr { + t.Errorf("StartTraceReception error = %v, wantErr %v", err, tt.wantErr) + } + if !tt.wantErr { + require.Nil(t, zr.StopTraceReception()) + } + }) + } +} + +func TestZipkinExporter_HostIngestionStatusChanges(t *testing.T) { + const jsonZipkinData = `[{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385","parentId": "86154a4ba6e91385","id": "4d1e00c0db9010db", + "kind": "CLIENT","name": "get", + "timestamp": 1472470996199000,"duration": 207000, + "localEndpoint": {"serviceName": "frontend","ipv6": "7::80:807f"}, + "remoteEndpoint": {"serviceName": "backend","ipv4": "192.168.99.101","port": 9000}, + "annotations": [ + {"timestamp": 1472470996238000,"value": "foo"}, + {"timestamp": 1472470996403000,"value": "bar"} +], +"tags": {"http.path": "/api","clnt/finagle.version": "6.45.0"} +}]` + type ingestionStateTest struct { + okToIngest bool + expectedHTTPStatus int + } + tests := []struct { + name string + backPressureSetting configmodels.BackPressureSetting + expectedReceivedBatches int + expectedIngestionBlockedRPCs int + expectedIngestionBlockedRPCsNoBackPressure int + ingestionStates []ingestionStateTest + }{ + { + name: "EnableBackPressure", + backPressureSetting: configmodels.EnableBackPressure, + expectedReceivedBatches: 2, + expectedIngestionBlockedRPCs: 1, + expectedIngestionBlockedRPCsNoBackPressure: 0, + ingestionStates: []ingestionStateTest{ + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: false, + expectedHTTPStatus: http.StatusServiceUnavailable, + }, + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + }, + }, + { + name: "DisableBackPressure", + backPressureSetting: configmodels.DisableBackPressure, + expectedReceivedBatches: 2, + expectedIngestionBlockedRPCs: 1, + expectedIngestionBlockedRPCsNoBackPressure: 1, + ingestionStates: []ingestionStateTest{ + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: false, + expectedHTTPStatus: http.StatusAccepted, + }, + { + okToIngest: true, + expectedHTTPStatus: http.StatusAccepted, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + doneFn := observabilitytest.SetupRecordedMetricsTest() + defer doneFn() + + sink := new(exportertest.SinkTraceExporter) + zr, err := New("127.0.0.1:0", tt.backPressureSetting, sink) + require.Nil(t, err) + require.NotNil(t, zr) + defer zr.StopTraceReception() + + host := receivertest.NewMockHost().(*receivertest.MockHost) + err = zr.StartTraceReception(host) + require.Nil(t, err) + + for _, ingestionState := range tt.ingestionStates { + host.SetOkToIngest(ingestionState.okToIngest) + rw := httptest.NewRecorder() + req := httptest.NewRequest( + "POST", + "https://tld.org/", + strings.NewReader(jsonZipkinData)) + zr.ServeHTTP(rw, req) + assert.Equal(t, ingestionState.expectedHTTPStatus, rw.Code) + } + + require.Equal(t, tt.expectedReceivedBatches, len(sink.AllTraces())) + require.Nil( + t, + observabilitytest.CheckValueViewReceiverReceivedSpans( + zipkinV2TagValue, + tt.expectedReceivedBatches), + ) + require.Nil( + t, + observabilitytest.CheckValueViewReceiverIngestionBlockedRPCs( + zipkinV2TagValue, + tt.expectedIngestionBlockedRPCs), + ) + + // This view should only have data if ingestion was blocked and there was no back pressure. + err = observabilitytest.CheckValueViewReceiverIngestionBlockedRPCsWithDataLoss( + zipkinV2TagValue, + tt.expectedIngestionBlockedRPCsNoBackPressure) + if tt.expectedIngestionBlockedRPCsNoBackPressure == 0 { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + }) + } +}