diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 85783b00361a..597bc511bb08 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -35,7 +35,6 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -83,14 +82,18 @@ func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) err return tc.sink.ConsumeTraces(ctx, td) } -func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) { +func testLoggerSettings(_ *testing.T) (component.TelemetrySettings, *observer.ObservedLogs, *tracetest.InMemoryExporter) { tset := componenttest.NewNopTelemetrySettings() core, obslogs := observer.New(zapcore.InfoLevel) exp := tracetest.NewInMemoryExporter() - tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + // Note: To debug any of the logs-based assertions in this test, uncomment + // the following line: + // + // tset.Logger = zap.New(zapcore.NewTee(core, zaptest.NewLogger(t).Core())) + tset.Logger = zap.New(core) tset.TracerProvider = trace.NewTracerProvider(trace.WithSyncer(exp)) return tset, obslogs, exp @@ -329,6 +332,9 @@ func logSigs(obs *observer.ObservedLogs) (map[string]int, []string) { for _, f := range rl.Context { attrs = append(attrs, f.Key) + // One way we can see memory limit errors is through the + // OTel-Arrow common "arrow stream error" message, which both + // sides will log. if rl.Message == "arrow stream error" && f.Key == "message" { msgs = append(msgs, f.String) } @@ -346,7 +352,11 @@ var limitRegexp = regexp.MustCompile(`memory limit exceeded`) func countMemoryLimitErrors(msgs []string) (cnt int) { for _, msg := range msgs { - if limitRegexp.MatchString(msg) { + // The memory errors are expected from the receiver, + // so whether these print on the exporter or receiver, + // the message will contain "otel-arrow decode" from + // the receiver. + if limitRegexp.MatchString(msg) && strings.Contains(msg, "otel-arrow decode") { cnt++ } } @@ -357,12 +367,12 @@ func failureMemoryLimitEnding(t *testing.T, _ testParams, testCon *testConsumer, eSigs, eMsgs := logSigs(testCon.expLogs) rSigs, rMsgs := logSigs(testCon.recvLogs) - // Test for arrow stream errors. - require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eSigs) + // Test for arrow receiver stream errors on both sides. + require.Less(t, 0, eSigs["arrow stream error|||code///message///where"], "should have exporter arrow stream errors: %v", eMsgs) require.Less(t, 0, rSigs["arrow stream error|||code///message///where"], "should have receiver arrow stream errors: %v", rSigs) - // Ensure the errors include memory limit errors. - + // Ensure both side's error logs include memory limit errors + // one way or another. require.Less(t, 0, countMemoryLimitErrors(rMsgs), "should have memory limit errors: %v", rMsgs) require.Less(t, 0, countMemoryLimitErrors(eMsgs), "should have memory limit errors: %v", eMsgs) @@ -374,7 +384,9 @@ func consumerSuccess(t *testing.T, err error) { } func consumerFailure(t *testing.T, err error) { - require.Error(t, err) + if err == nil { + return + } // there should be no permanent errors anywhere in this test. require.False(t, consumererror.IsPermanent(err), @@ -414,32 +426,37 @@ func TestIntegrationTracesSimple(t *testing.T) { } func TestIntegrationMemoryLimited(t *testing.T) { - // This test is flaky, it only shows on Windows. This will be - // addressed in a separate PR. - t.Skip("test flake disabled") - ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // until 10 threads can write 100 spans + // until exporter and receiver finish at least one ArrowTraces span. params := testParams{ threadCount: 10, requestUntil: func(test *testConsumer) bool { - cnt := 0 - for _, span := range test.expSpans.GetSpans() { - if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { - cnt++ + cf := func(spans tracetest.SpanStubs) (cnt int) { + for _, span := range spans { + if span.Name == "opentelemetry.proto.experimental.arrow.v1.ArrowTracesService/ArrowTraces" { + cnt++ + } } + return } - return cnt == 0 || test.sentSpans.Load() < 100 - + rcnt := cf(test.recvSpans.GetSpans()) + ecnt := cf(test.expSpans.GetSpans()) + return ecnt == 0 || rcnt == 0 }, } testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) { rcfg.Arrow.MemoryLimitMiB = 1 ecfg.Arrow.NumStreams = 10 + // Shorten timeouts for this test, because we intend + // for it to fail and don't want to wait for retries. ecfg.TimeoutSettings.Timeout = 5 * time.Second + ecfg.RetryConfig.InitialInterval = 1 * time.Second + ecfg.RetryConfig.MaxInterval = 2 * time.Second + ecfg.RetryConfig.MaxElapsedTime = 30 * time.Second + ecfg.Arrow.MaxStreamLifetime = 5 * time.Second }, bulkyGenFunc(), consumerFailure, failureMemoryLimitEnding) }