Skip to content

Commit

Permalink
Add filestream benchmarks for many files case, fix data race (#37345)
Browse files Browse the repository at this point in the history
Before it was only testing for a single file not the tests support
many files.

During test runs with the race detector a data race was found and fixed.
  • Loading branch information
rdner authored Dec 18, 2023
1 parent 6759826 commit a633696
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 62 deletions.
27 changes: 13 additions & 14 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type fileMeta struct {
type filestream struct {
readerConfig readerConfig
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parsers parser.Config
}
Expand Down Expand Up @@ -175,7 +174,7 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
}

func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) {
f, err := inp.openFile(log, fs.newPath, offset)
f, encoding, err := inp.openFile(log, fs.newPath, offset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
Codec: encoding,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
Expand All @@ -241,33 +240,33 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, error) {
func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, encoding.Encoding, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

// it must be checked if the file is not a named pipe before we try to open it
// if it is a named pipe os.OpenFile fails, so there is no need to try opening it.
if fi.Mode()&os.ModeNamedPipe != 0 {
return nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
}

ok := false
f, err := file.ReadOpen(path)
if err != nil {
return nil, fmt.Errorf("failed opening %s: %w", path, err)
return nil, nil, fmt.Errorf("failed opening %s: %w", path, err)
}
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))

fi, err = f.Stat()
if err != nil {
return nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

err = checkFileBeforeOpening(fi)
if err != nil {
return nil, err
return nil, nil, err
}

if fi.Size() < offset {
Expand All @@ -276,20 +275,20 @@ func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*o
}
err = inp.initFileOffset(f, offset)
if err != nil {
return nil, err
return nil, nil, err
}

inp.encoding, err = inp.encodingFactory(f)
encoding, err := inp.encodingFactory(f)
if err != nil {
f.Close()
if errors.Is(err, transform.ErrShortSrc) {
return nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
}
return nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
}
ok = true

return f, nil
return f, encoding, nil
}

func checkFileBeforeOpening(fi os.FileInfo) error {
Expand Down
146 changes: 98 additions & 48 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
Expand All @@ -38,25 +39,26 @@ import (

func BenchmarkFilestream(b *testing.B) {
logp.TestingSetup(logp.ToDiscardOutput())
lineCount := 10000
filename := generateFile(b, lineCount)

b.ResetTimer()
b.Run("single file", func(b *testing.B) {
lineCount := 10000
filename := generateFile(b, b.TempDir(), lineCount)
b.ResetTimer()

b.Run("filestream default throughput", func(b *testing.B) {
cfg := `
b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount)
}
})
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-inode-benchmark-%d", i), cfg, lineCount)
}
})

b.Run("filestream fingerprint throughput", func(b *testing.B) {
cfg := `
b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
Expand All @@ -65,9 +67,51 @@ file_identity.fingerprint: ~
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount)
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-fp-benchmark-%d", i), cfg, lineCount)
}
})
})

b.Run("many files", func(b *testing.B) {
lineCount := 1000
fileCount := 100
dir := b.TempDir()

for i := 0; i < fileCount; i++ {
_ = generateFile(b, dir, lineCount)
}

ingestPath := filepath.Join(dir, "*")
expEvents := lineCount * fileCount
b.ResetTimer()

b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-inode-benchmark-%d", i), cfg, expEvents)
}
})

b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-fp-benchmark-%d", i), cfg, expEvents)
}
})
})
}

Expand All @@ -76,23 +120,25 @@ paths:
// `cfg` must be a valid YAML string containing valid filestream configuration
// `expEventCount` is an expected amount of produced events
func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) {
b.Helper()
// we don't include initialization in the benchmark time
b.StopTimer()
runner := createFilestreamTestRunner(b, testID, cfg, expEventCount)
runner := createFilestreamTestRunner(context.Background(), b, testID, cfg, int64(expEventCount), false)
// this is where the benchmark actually starts
b.StartTimer()
events := runner(b)
require.Len(b, events, expEventCount)
_ = runner(b)
}

// createFilestreamTestRunner can be used for both benchmarks and regular tests to run a filestream input
// with the given configuration and event limit.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
// `eventLimit` is an amount of produced events after which the filestream will shutdown
// `collectEvents` if `true` the runner will return a list of all events produced by the filestream input.
// Events should not be collected in benchmarks due to high extra costs of using the channel.
//
// returns a runner function that returns produced events.
func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLimit int) func(t testing.TB) []beat.Event {
func createFilestreamTestRunner(ctx context.Context, b testing.TB, testID string, cfg string, eventLimit int64, collectEvents bool) func(t testing.TB) []beat.Event {
logger := logp.L()
c, err := conf.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(b, err)
Expand All @@ -101,41 +147,43 @@ func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLi
input, err := p.Manager.Create(c)
require.NoError(b, err)

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
context := v2.Context{
Logger: logger,
ID: testID,
Cancelation: ctx,
}

events := make([]beat.Event, 0, eventLimit)
connector, eventsDone := newTestPipeline(eventLimit, &events)
done := make(chan struct{})
connector, events := newTestPipeline(eventLimit, collectEvents)
var out []beat.Event
if collectEvents {
out = make([]beat.Event, 0, eventLimit)
}
go func() {
// even if `collectEvents` is false we need to range the channel
// and wait until it's closed indicating that the input finished its job
for event := range events {
out = append(out, event)
}
cancel()
}()

return func(t testing.TB) []beat.Event {
go func() {
err := input.Run(context, connector)
assert.NoError(b, err)
close(done)
}()
err := input.Run(context, connector)
require.NoError(b, err)

<-eventsDone
cancel()
<-done // for more stable results we should wait until the full shutdown
return events
return out
}
}

func generateFile(t testing.TB, lineCount int) string {
func generateFile(t testing.TB, dir string, lineCount int) string {
t.Helper()
dir := t.TempDir()
file, err := os.CreateTemp(dir, "lines.log")
file, err := os.CreateTemp(dir, "*")
require.NoError(t, err)

filename := file.Name()
for i := 0; i < lineCount; i++ {
fmt.Fprintf(file, "rather mediocre log line message - %d\n", i)
fmt.Fprintf(file, "rather mediocre log line message in %s - %d\n", filename, i)
}
filename := file.Name()
err = file.Close()
require.NoError(t, err)
return filename
Expand All @@ -161,15 +209,15 @@ func (s *testStore) CleanupInterval() time.Duration {
return time.Second
}

func newTestPipeline(eventLimit int, out *[]beat.Event) (pc beat.PipelineConnector, done <-chan struct{}) {
ch := make(chan struct{})
return &testPipeline{limit: eventLimit, done: ch, out: out}, ch
func newTestPipeline(eventLimit int64, collectEvents bool) (pc beat.PipelineConnector, out <-chan beat.Event) {
ch := make(chan beat.Event, eventLimit)
return &testPipeline{limit: eventLimit, out: ch, collect: collectEvents}, ch
}

type testPipeline struct {
done chan struct{}
limit int
out *[]beat.Event
limit int64
out chan beat.Event
collect bool
}

func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
Expand All @@ -184,13 +232,15 @@ type testClient struct {
}

func (c *testClient) Publish(event beat.Event) {
c.testPipeline.limit--
if c.testPipeline.limit < 0 {
newLimit := atomic.AddInt64(&c.testPipeline.limit, -1)
if newLimit < 0 {
return
}
*c.testPipeline.out = append(*c.testPipeline.out, event)
if c.testPipeline.limit == 0 {
close(c.testPipeline.done)
if c.testPipeline.collect {
c.testPipeline.out <- event
}
if newLimit == 0 {
close(c.testPipeline.out)
}
}

Expand Down

0 comments on commit a633696

Please sign in to comment.