Skip to content

Commit

Permalink
Add tests for fileProspector in filestream input (elastic#21712)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds tests to see how `fileProspector` handles Create, Write and Delete operations. In order to make the `Prospector` testable I changed `HarvesterGroup` an interface so it can be mocked

During the testing an issue with path identifier showed up when a file was deleted. The identifier generated an incorrect value for `Name`. Now it is fixed.

(cherry picked from commit 2151d15)
  • Loading branch information
kvch committed Oct 16, 2020
1 parent 5498661 commit 0ec309d
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 14 deletions.
6 changes: 5 additions & 1 deletion filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func newPathIdentifier(_ *common.Config) (fileIdentifier, error) {
}

func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
path := e.NewPath
if e.Op == loginp.OpDelete {
path = e.OldPath
}
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + p.name + identitySep + e.NewPath,
name: pluginName + identitySep + p.name + identitySep + path,
identifierGenerator: p.name,
}
}
Expand Down
10 changes: 7 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ type Harvester interface {

// HarvesterGroup is responsible for running the
// Harvesters started by the Prospector.
type HarvesterGroup struct {
type HarvesterGroup interface {
Run(input.Context, Source) error
}

type defaultHarvesterGroup struct {
manager *InputManager
readers map[string]context.CancelFunc
pipeline beat.PipelineConnector
Expand All @@ -54,7 +58,7 @@ type HarvesterGroup struct {
}

// Run starts the Harvester for a Source.
func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error {
func (hg *defaultHarvesterGroup) Run(ctx input.Context, s Source) error {
log := ctx.Logger.With("source", s.Name())
log.Debug("Starting harvester for file")

Expand Down Expand Up @@ -111,7 +115,7 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error {
}

// Cancel stops the running Harvester for a given Source.
func (hg *HarvesterGroup) Cancel(s Source) error {
func (hg *defaultHarvesterGroup) Cancel(s Source) error {
if cancel, ok := hg.readers[s.Name()]; ok {
cancel()
return nil
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (inp *managedInput) Run(
store.Retain()
defer store.Release()

hg := &HarvesterGroup{
hg := &defaultHarvesterGroup{
pipeline: pipeline,
readers: make(map[string]context.CancelFunc),
manager: inp.manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type Prospector interface {
// Run starts the event loop and handles the incoming events
// either by starting/stopping a harvester, or updating the statestore.
Run(input.Context, *statestore.Store, *HarvesterGroup)
Run(input.Context, *statestore.Store, HarvesterGroup)
// Test checks if the Prospector is able to run the configuration
// specified by the user.
Test() error
Expand Down
15 changes: 7 additions & 8 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newFileProspector(
}

// Run starts the fileProspector which accepts FS events from a file watcher.
func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) {
func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.HarvesterGroup) {
log := ctx.Logger.With("prospector", prospectorDebugKey)
log.Debug("Starting prospector")
defer log.Debug("Prospector has stopped")
Expand Down Expand Up @@ -100,8 +100,12 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.

src := p.identifier.GetSource(fe)
switch fe.Op {
case loginp.OpCreate:
log.Debugf("A new file %s has been found", fe.NewPath)
case loginp.OpCreate, loginp.OpWrite:
if fe.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", fe.NewPath)
} else if fe.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", fe.NewPath)
}

if p.ignoreOlder > 0 {
now := time.Now()
Expand All @@ -113,11 +117,6 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.

hg.Run(ctx, src)

case loginp.OpWrite:
log.Debugf("File %s has been updated", fe.NewPath)

hg.Run(ctx, src)

case loginp.OpDelete:
log.Debugf("File %s has been removed", fe.OldPath)

Expand Down
197 changes: 197 additions & 0 deletions filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

func TestProspectorNewAndUpdatedFiles(t *testing.T) {
minuteAgo := time.Now().Add(-1 * time.Minute)

testCases := map[string]struct {
events []loginp.FSEvent
ignoreOlder time.Duration
expectedSources []string
}{
"two new files": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"},
loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"},
},
expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"},
},
"one updated file": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"},
},
expectedSources: []string{"filestream::path::/path/to/file"},
},
"old files with ignore older configured": {
events: []loginp.FSEvent{
loginp.FSEvent{
Op: loginp.OpCreate,
NewPath: "/path/to/file",
Info: testFileInfo{"/path/to/file", 5, minuteAgo},
},
loginp.FSEvent{
Op: loginp.OpWrite,
NewPath: "/path/to/other/file",
Info: testFileInfo{"/path/to/other/file", 5, minuteAgo},
},
},
ignoreOlder: 10 * time.Second,
expectedSources: []string{},
},
"newer files with ignore older": {
events: []loginp.FSEvent{
loginp.FSEvent{
Op: loginp.OpCreate,
NewPath: "/path/to/file",
Info: testFileInfo{"/path/to/file", 5, minuteAgo},
},
loginp.FSEvent{
Op: loginp.OpWrite,
NewPath: "/path/to/other/file",
Info: testFileInfo{"/path/to/other/file", 5, minuteAgo},
},
},
ignoreOlder: 5 * time.Minute,
expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"},
},
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
p := fileProspector{
filewatcher: &mockFileWatcher{events: test.events},
identifier: mustPathIdentifier(),
ignoreOlder: test.ignoreOlder,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
hg := getTestHarvesterGroup()

p.Run(ctx, testStateStore(), hg)

assert.ElementsMatch(t, hg.encounteredNames, test.expectedSources)
})
}
}

func TestProspectorDeletedFile(t *testing.T) {
testCases := map[string]struct {
events []loginp.FSEvent
cleanRemoved bool
}{
"one deleted file without clean removed": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"},
},
cleanRemoved: false,
},
"one deleted file with clean removed": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"},
},
cleanRemoved: true,
},
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
p := fileProspector{
filewatcher: &mockFileWatcher{events: test.events},
identifier: mustPathIdentifier(),
cleanRemoved: test.cleanRemoved,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}

testStore := testStateStore()
testStore.Set("filestream::path::/path/to/file", nil)

p.Run(ctx, testStore, getTestHarvesterGroup())

has, err := testStore.Has("filestream::path::/path/to/file")
if err != nil {
t.Fatal(err)
}

if test.cleanRemoved {
assert.False(t, has)
} else {
assert.True(t, has)

}
})
}
}

type testHarvesterGroup struct {
encounteredNames []string
}

func getTestHarvesterGroup() *testHarvesterGroup { return &testHarvesterGroup{make([]string, 0)} }

func (t *testHarvesterGroup) Run(_ input.Context, s loginp.Source) error {
t.encounteredNames = append(t.encounteredNames, s.Name())
return nil
}

type mockFileWatcher struct {
nextIdx int
events []loginp.FSEvent
}

func (m *mockFileWatcher) Event() loginp.FSEvent {
if len(m.events) == m.nextIdx {
return loginp.FSEvent{}
}
evt := m.events[m.nextIdx]
m.nextIdx++
return evt
}
func (m *mockFileWatcher) Run(_ unison.Canceler) { return }

func testStateStore() *statestore.Store {
s, _ := statestore.NewRegistry(storetest.NewMemoryStoreBackend()).Get(pluginName)
return s
}

func mustPathIdentifier() fileIdentifier {
pathIdentifier, err := newPathIdentifier(nil)
if err != nil {
panic(err)
}
return pathIdentifier

}

0 comments on commit 0ec309d

Please sign in to comment.