diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2a0ba34929bb..f3950c3fa0eb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -47,6 +47,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* - tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712] +- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index 70db34797dbf..4aa858e572f8 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -21,7 +21,9 @@ const ( ) type listingInfo struct { - totObjects int + totObjects int + + mu sync.Mutex storedObjects int errorObjects int finalCheck bool @@ -105,11 +107,19 @@ func (s *states) Delete(id string) { func (s *states) IsListingFullyStored(listingID string) bool { info, _ := s.listingInfo.Load(listingID) listingInfo := info.(*listingInfo) + listingInfo.mu.Lock() + defer listingInfo.mu.Unlock() if listingInfo.finalCheck { return false } listingInfo.finalCheck = (listingInfo.storedObjects + listingInfo.errorObjects) == listingInfo.totObjects + + if (listingInfo.storedObjects + listingInfo.errorObjects) > listingInfo.totObjects { + s.log.Warnf("unexepected mixmatch between storedObjects (%d), errorObjects (%d) and totObjects (%d)", + listingInfo.storedObjects, listingInfo.errorObjects, listingInfo.totObjects) + } + return listingInfo.finalCheck } @@ -154,6 +164,9 @@ func (s *states) Update(newState state, listingID string) { // here we increase the number of stored object info, _ := s.listingInfo.Load(listingID) listingInfo := info.(*listingInfo) + + listingInfo.mu.Lock() + if newState.Stored { listingInfo.storedObjects++ } @@ -162,6 +175,8 @@ func (s *states) Update(newState state, listingID string) { listingInfo.errorObjects++ } + listingInfo.mu.Unlock() + if _, ok := s.statesByListingID[listingID]; !ok { s.statesByListingID[listingID] = make([]state, 0) }