Skip to content

Commit

Permalink
[Filebeat][CometD] Resolve flaky unit test (#34903)
Browse files Browse the repository at this point in the history
* update unit test

* add change log entry
  • Loading branch information
niraj-elastic authored Apr 3, 2023
1 parent 3e54994 commit c4f86d6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689]
- [Gcs Input] - Added missing locks for safe concurrency {pull}34914[34914]
- Fix the ignore_inactive option being ignored in Filebeat's filestream input {pull}34770[34770]
- Fix TestMultiEventForEOFRetryHandlerInput unit test of CometD input {pull}34903[34903]

*Heartbeat*

Expand Down
49 changes: 22 additions & 27 deletions x-pack/filebeat/input/cometd/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,18 +393,16 @@ func assertEventMatches(t *testing.T, expected bay.MaybeMsg, got beat.Event) {
func TestMultiEventForEOFRetryHandlerInput(t *testing.T) {
var err error

errorAfterEvent := 2
expectedHTTPEventCount := 6
expectedEventCount := 4
expectedEventCount := 2

eventsCh := make(chan beat.Event)
defer close(eventsCh)
eventsCh := make(chan beat.Event, expectedEventCount)
signal := make(chan struct{}, 1)
defer close(signal)

outlet := &mockedOutleter{
onEventHandler: func(event beat.Event) bool {
eventsCh <- event
signal <- struct{}{}
return true
},
}
Expand Down Expand Up @@ -442,16 +440,11 @@ func TestMultiEventForEOFRetryHandlerInput(t *testing.T) {
_, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`))
return
case "/meta/connect":
if i < expectedHTTPEventCount {
if i == errorAfterEvent {
// stop server to produce EOF errors
signal <- struct{}{}
}
i++
if i == 0 {
_, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`))
i++
return
}
i++
_, _ = w.Write([]byte(`{}`))
return
case "/meta/subscribe":
Expand All @@ -474,31 +467,33 @@ func TestMultiEventForEOFRetryHandlerInput(t *testing.T) {
require.NotNil(t, input)

input.Run()
go func() {
j := 0
for event := range eventsCh {
if j >= expectedEventCount {
signal <- struct{}{}
break
}
assertEventMatches(t, expected, event)
j++
}
}()

<-signal
// close previous connection
<-signal
server.CloseClientConnections()
server.Close()
time.Sleep(100 * time.Millisecond)

// restart connection for new events
i = 0
server, err = newTestServer(strings.Split(serverURL, "http://")[1], r)
assert.NoError(t, err)
serverURL = server.URL
defer server.Close()
for err != nil {
server, err = newTestServer(strings.Split(serverURL, "http://")[1], r)
}
<-signal
server.CloseClientConnections()
server.Close()

close(eventsCh)

go func() {
for j := 0; j < expectedEventCount; j++ {
event := <-eventsCh
assertEventMatches(t, expected, event)
}
signal <- struct{}{}
}()
<-signal
input.Stop()
}

Expand Down

0 comments on commit c4f86d6

Please sign in to comment.