From c4f86d6869aebc9521f69ed8aace55ff8375a645 Mon Sep 17 00:00:00 2001 From: niraj-elastic <124254029+niraj-elastic@users.noreply.github.com> Date: Mon, 3 Apr 2023 18:32:40 +0530 Subject: [PATCH] [Filebeat][CometD] Resolve flaky unit test (#34903) * update unit test * add change log entry --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/cometd/input_test.go | 49 ++++++++++------------ 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 52fa1851f81..1adf4c125ff 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] - [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914] - Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770] +- Fix TestMultiEventForEOFRetryHandlerInput unit test of CometD input {pull}34903[34903] *Heartbeat* diff --git a/x-pack/filebeat/input/cometd/input_test.go b/x-pack/filebeat/input/cometd/input_test.go index 6f0814753a4..0dcc17781aa 100644 --- a/x-pack/filebeat/input/cometd/input_test.go +++ b/x-pack/filebeat/input/cometd/input_test.go @@ -393,18 +393,16 @@ func assertEventMatches(t *testing.T, expected bay.MaybeMsg, got beat.Event) { func TestMultiEventForEOFRetryHandlerInput(t *testing.T) { var err error - errorAfterEvent := 2 - expectedHTTPEventCount := 6 - expectedEventCount := 4 + expectedEventCount := 2 - eventsCh := make(chan beat.Event) - defer close(eventsCh) + eventsCh := make(chan beat.Event, expectedEventCount) signal := make(chan struct{}, 1) defer close(signal) outlet := &mockedOutleter{ onEventHandler: func(event beat.Event) bool { eventsCh <- event + signal <- struct{}{} return true }, } @@ -442,16 +440,11 @@ func TestMultiEventForEOFRetryHandlerInput(t *testing.T) { _, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`)) return case "/meta/connect": - if i < expectedHTTPEventCount { - if i == errorAfterEvent { - // stop server to produce EOF errors - signal <- struct{}{} - } - i++ + if i == 0 { _, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`)) + i++ return } - i++ _, _ = w.Write([]byte(`{}`)) return case "/meta/subscribe": @@ -474,31 +467,33 @@ func TestMultiEventForEOFRetryHandlerInput(t *testing.T) { require.NotNil(t, input) input.Run() - go func() { - j := 0 - for event := range eventsCh { - if j >= expectedEventCount { - signal <- struct{}{} - break - } - assertEventMatches(t, expected, event) - j++ - } - }() - <-signal // close previous connection + <-signal server.CloseClientConnections() server.Close() time.Sleep(100 * time.Millisecond) // restart connection for new events + i = 0 server, err = newTestServer(strings.Split(serverURL, "http://")[1], r) - assert.NoError(t, err) - serverURL = server.URL - defer server.Close() + for err != nil { + server, err = newTestServer(strings.Split(serverURL, "http://")[1], r) + } <-signal + server.CloseClientConnections() + server.Close() + close(eventsCh) + + go func() { + for j := 0; j < expectedEventCount; j++ { + event := <-eventsCh + assertEventMatches(t, expected, event) + } + signal <- struct{}{} + }() + <-signal input.Stop() }