Skip to content

Commit

Permalink
Enhance and refactor some integration tests (#34920)
Browse files Browse the repository at this point in the history
* add waitUntilEventCountCtx so it can time out or be cancelled
* rename methods to better reflect what they actually do
  • Loading branch information
AndersonQ authored Apr 3, 2023
1 parent 324aae0 commit 3e54994
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 64 deletions.
47 changes: 39 additions & 8 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
// under the License.

//go:build integration
// +build integration

package filestream

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -122,25 +120,25 @@ func (e *inputTestingEnvironment) waitUntilInputStops() {
e.wg.Wait()
}

func (e *inputTestingEnvironment) mustWriteLinesToFile(filename string, lines []byte) {
func (e *inputTestingEnvironment) mustWriteToFile(filename string, data []byte) {
path := e.abspath(filename)
err := ioutil.WriteFile(path, lines, 0o644)
err := os.WriteFile(path, data, 0o644)
if err != nil {
e.t.Fatalf("failed to write file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustAppendLinesToFile(filename string, lines []byte) {
func (e *inputTestingEnvironment) mustAppendToFile(filename string, data []byte) {
path := e.abspath(filename)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
e.t.Fatalf("failed to open file '%s': %+v", path, err)
}
defer f.Close()

_, err = f.Write(lines)
_, err = f.Write(data)
if err != nil {
e.t.Fatalf("append lines to file '%s': %+v", path, err)
e.t.Fatalf("append data to file '%s': %+v", path, err)
}
}

Expand Down Expand Up @@ -335,6 +333,38 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
}
}

// waitUntilEventCountCtx calls waitUntilEventCount, but fails if ctx is cancelled.
func (e *inputTestingEnvironment) waitUntilEventCountCtx(ctx context.Context, count int) {
e.t.Helper()
ch := make(chan struct{})

go func() {
e.waitUntilEventCount(count)
ch <- struct{}{}
}()

select {
case <-ctx.Done():
logLines := map[string][]string{}
for _, e := range e.pipeline.GetAllEvents() {
flat := e.Fields.Flatten()
pathi, _ := flat.GetValue("log.file.path")
path := pathi.(string)
msgi, _ := flat.GetValue("message")
msg := msgi.(string)
logLines[path] = append(logLines[path], msg)
}

e.t.Fatalf("waitUntilEventCountCtx: %v. Want %d events, got %d: %v",
ctx.Err(),
count,
len(e.pipeline.GetAllEvents()),
logLines)
case <-ch:
return
}
}

// waitUntilAtLeastEventCount waits until at least count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilAtLeastEventCount(count int) {
for {
Expand Down Expand Up @@ -378,7 +408,8 @@ func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
}
}

require.Equal(e.t, 0, len(missingEvents), "following events are missing: %+v", missingEvents)
require.Equal(e.t, 0, len(missingEvents),
"following events are missing: %+v", missingEvents)
}

func (e *inputTestingEnvironment) getOutputMessages() []string {
Expand Down
Loading

0 comments on commit 3e54994

Please sign in to comment.