Skip to content

Commit

Permalink
Filebeat: Fix random error on harvester close (elastic#21048)
Browse files Browse the repository at this point in the history
This fixes a race condition when a harvester is closed at the same time
that its source file size is being calculated.

Before this fix, `Error updating file size` errors are randomly printed, because
the file handle becomes closed inside the os.Stat call.
  • Loading branch information
adriansr authored Sep 10, 2020
1 parent 54ea284 commit af6222d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update documentation in the azure module filebeat. {pull}20815[20815]
- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908]
- Remove wrongly mapped `tls.client.server_name` from `fortinet/firewall` fileset. {pull}20983[20983]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]

*Heartbeat*

Expand Down
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Harvester struct {

// shutdown handling
done chan struct{}
doneWg *sync.WaitGroup
stopOnce sync.Once
stopWg *sync.WaitGroup
stopLock sync.Mutex
Expand Down Expand Up @@ -138,6 +139,7 @@ func NewHarvester(
publishState: publishState,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
doneWg: &sync.WaitGroup{},
id: id,
outletFactory: outletFactory,
}
Expand Down Expand Up @@ -299,7 +301,11 @@ func (h *Harvester) Run() error {

logp.Info("Harvester started for file: %s", h.state.Source)

go h.monitorFileSize()
h.doneWg.Add(1)
go func() {
h.monitorFileSize()
h.doneWg.Done()
}()

for {
select {
Expand Down Expand Up @@ -378,7 +384,8 @@ func (h *Harvester) monitorFileSize() {
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)

// Wait for goroutines monitoring h.done to terminate before closing source.
h.doneWg.Wait()
filesMetrics.Remove(h.id.String())
})
}
Expand Down

0 comments on commit af6222d

Please sign in to comment.