diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index b935161c1269..6cedd1bd9d16 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -59,7 +59,6 @@ type fileMeta struct { type filestream struct { readerConfig readerConfig encodingFactory encoding.EncodingFactory - encoding encoding.Encoding closerConfig closerConfig parsers parser.Config } @@ -175,7 +174,7 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { } func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) { - f, err := inp.openFile(log, fs.newPath, offset) + f, encoding, err := inp.openFile(log, fs.newPath, offset) if err != nil { return nil, err } @@ -216,7 +215,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo var r reader.Reader r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{ - Codec: inp.encoding, + Codec: encoding, BufferSize: inp.readerConfig.BufferSize, Terminator: inp.readerConfig.LineTerminator, MaxBytes: encReaderMaxBytes, @@ -241,33 +240,33 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo // or the file cannot be opened because for example of failing read permissions, an error // is returned and the harvester is closed. The file will be picked up again the next time // the file system is scanned -func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, error) { +func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, encoding.Encoding, error) { fi, err := os.Stat(path) if err != nil { - return nil, fmt.Errorf("failed to stat source file %s: %w", path, err) + return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err) } // it must be checked if the file is not a named pipe before we try to open it // if it is a named pipe os.OpenFile fails, so there is no need to try opening it. if fi.Mode()&os.ModeNamedPipe != 0 { - return nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name()) + return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name()) } ok := false f, err := file.ReadOpen(path) if err != nil { - return nil, fmt.Errorf("failed opening %s: %w", path, err) + return nil, nil, fmt.Errorf("failed opening %s: %w", path, err) } defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close)) fi, err = f.Stat() if err != nil { - return nil, fmt.Errorf("failed to stat source file %s: %w", path, err) + return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err) } err = checkFileBeforeOpening(fi) if err != nil { - return nil, err + return nil, nil, err } if fi.Size() < offset { @@ -276,20 +275,20 @@ func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*o } err = inp.initFileOffset(f, offset) if err != nil { - return nil, err + return nil, nil, err } - inp.encoding, err = inp.encodingFactory(f) + encoding, err := inp.encodingFactory(f) if err != nil { f.Close() if errors.Is(err, transform.ErrShortSrc) { - return nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f) + return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f) } - return nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err) + return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err) } ok = true - return f, nil + return f, encoding, nil } func checkFileBeforeOpening(fi os.FileInfo) error { diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 55b8d2e7fc66..a1d9729c5aad 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -21,10 +21,11 @@ import ( "context" "fmt" "os" + "path/filepath" + "sync/atomic" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" @@ -38,25 +39,26 @@ import ( func BenchmarkFilestream(b *testing.B) { logp.TestingSetup(logp.ToDiscardOutput()) - lineCount := 10000 - filename := generateFile(b, lineCount) - b.ResetTimer() + b.Run("single file", func(b *testing.B) { + lineCount := 10000 + filename := generateFile(b, b.TempDir(), lineCount) + b.ResetTimer() - b.Run("filestream default throughput", func(b *testing.B) { - cfg := ` + b.Run("inode throughput", func(b *testing.B) { + cfg := ` type: filestream prospector.scanner.check_interval: 1s paths: - ` + filename + ` ` - for i := 0; i < b.N; i++ { - runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount) - } - }) + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("one-file-inode-benchmark-%d", i), cfg, lineCount) + } + }) - b.Run("filestream fingerprint throughput", func(b *testing.B) { - cfg := ` + b.Run("fingerprint throughput", func(b *testing.B) { + cfg := ` type: filestream prospector.scanner: fingerprint.enabled: true @@ -65,9 +67,51 @@ file_identity.fingerprint: ~ paths: - ` + filename + ` ` - for i := 0; i < b.N; i++ { - runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount) + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("one-file-fp-benchmark-%d", i), cfg, lineCount) + } + }) + }) + + b.Run("many files", func(b *testing.B) { + lineCount := 1000 + fileCount := 100 + dir := b.TempDir() + + for i := 0; i < fileCount; i++ { + _ = generateFile(b, dir, lineCount) } + + ingestPath := filepath.Join(dir, "*") + expEvents := lineCount * fileCount + b.ResetTimer() + + b.Run("inode throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner.check_interval: 1s +paths: + - ` + ingestPath + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("many-files-inode-benchmark-%d", i), cfg, expEvents) + } + }) + + b.Run("fingerprint throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner: + fingerprint.enabled: true + check_interval: 1s +file_identity.fingerprint: ~ +paths: + - ` + ingestPath + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("many-files-fp-benchmark-%d", i), cfg, expEvents) + } + }) }) } @@ -76,13 +120,13 @@ paths: // `cfg` must be a valid YAML string containing valid filestream configuration // `expEventCount` is an expected amount of produced events func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) { + b.Helper() // we don't include initialization in the benchmark time b.StopTimer() - runner := createFilestreamTestRunner(b, testID, cfg, expEventCount) + runner := createFilestreamTestRunner(context.Background(), b, testID, cfg, int64(expEventCount), false) // this is where the benchmark actually starts b.StartTimer() - events := runner(b) - require.Len(b, events, expEventCount) + _ = runner(b) } // createFilestreamTestRunner can be used for both benchmarks and regular tests to run a filestream input @@ -90,9 +134,11 @@ func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCou // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration // `eventLimit` is an amount of produced events after which the filestream will shutdown +// `collectEvents` if `true` the runner will return a list of all events produced by the filestream input. +// Events should not be collected in benchmarks due to high extra costs of using the channel. // // returns a runner function that returns produced events. -func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLimit int) func(t testing.TB) []beat.Event { +func createFilestreamTestRunner(ctx context.Context, b testing.TB, testID string, cfg string, eventLimit int64, collectEvents bool) func(t testing.TB) []beat.Event { logger := logp.L() c, err := conf.NewConfigWithYAML([]byte(cfg), cfg) require.NoError(b, err) @@ -101,41 +147,43 @@ func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLi input, err := p.Manager.Create(c) require.NoError(b, err) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) context := v2.Context{ Logger: logger, ID: testID, Cancelation: ctx, } - events := make([]beat.Event, 0, eventLimit) - connector, eventsDone := newTestPipeline(eventLimit, &events) - done := make(chan struct{}) + connector, events := newTestPipeline(eventLimit, collectEvents) + var out []beat.Event + if collectEvents { + out = make([]beat.Event, 0, eventLimit) + } + go func() { + // even if `collectEvents` is false we need to range the channel + // and wait until it's closed indicating that the input finished its job + for event := range events { + out = append(out, event) + } + cancel() + }() return func(t testing.TB) []beat.Event { - go func() { - err := input.Run(context, connector) - assert.NoError(b, err) - close(done) - }() + err := input.Run(context, connector) + require.NoError(b, err) - <-eventsDone - cancel() - <-done // for more stable results we should wait until the full shutdown - return events + return out } } -func generateFile(t testing.TB, lineCount int) string { +func generateFile(t testing.TB, dir string, lineCount int) string { t.Helper() - dir := t.TempDir() - file, err := os.CreateTemp(dir, "lines.log") + file, err := os.CreateTemp(dir, "*") require.NoError(t, err) - + filename := file.Name() for i := 0; i < lineCount; i++ { - fmt.Fprintf(file, "rather mediocre log line message - %d\n", i) + fmt.Fprintf(file, "rather mediocre log line message in %s - %d\n", filename, i) } - filename := file.Name() err = file.Close() require.NoError(t, err) return filename @@ -161,15 +209,15 @@ func (s *testStore) CleanupInterval() time.Duration { return time.Second } -func newTestPipeline(eventLimit int, out *[]beat.Event) (pc beat.PipelineConnector, done <-chan struct{}) { - ch := make(chan struct{}) - return &testPipeline{limit: eventLimit, done: ch, out: out}, ch +func newTestPipeline(eventLimit int64, collectEvents bool) (pc beat.PipelineConnector, out <-chan beat.Event) { + ch := make(chan beat.Event, eventLimit) + return &testPipeline{limit: eventLimit, out: ch, collect: collectEvents}, ch } type testPipeline struct { - done chan struct{} - limit int - out *[]beat.Event + limit int64 + out chan beat.Event + collect bool } func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { @@ -184,13 +232,15 @@ type testClient struct { } func (c *testClient) Publish(event beat.Event) { - c.testPipeline.limit-- - if c.testPipeline.limit < 0 { + newLimit := atomic.AddInt64(&c.testPipeline.limit, -1) + if newLimit < 0 { return } - *c.testPipeline.out = append(*c.testPipeline.out, event) - if c.testPipeline.limit == 0 { - close(c.testPipeline.done) + if c.testPipeline.collect { + c.testPipeline.out <- event + } + if newLimit == 0 { + close(c.testPipeline.out) } }