Skip to content

Commit

Permalink
[filebeat][gcs] Fix for concurrency issues and context timeouts in th…
Browse files Browse the repository at this point in the history
…e GCS input (#35605)

* enhancement and bugfix for gcs input

* updated changelog

* addressed go lint errors

* updated changelog and addresses PR suggestions

* addressed PR suggestions

* updated PR to contain only bugfixes

* updated changelog
  • Loading branch information
ShourieG authored Jun 3, 2023
1 parent 1dcc62a commit 7788a6d
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433]
- RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360]
- [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581]
- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605]
- Fix filestream false positive log error "filestream input with ID 'xyz' already exists" {issue}31767[31767]

*Heartbeat*
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down
60 changes: 21 additions & 39 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ const (
)

func Test_StorageClient(t *testing.T) {
t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332")
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
unexpectedError error
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
}{
{
name: "SingleBucketWithPoll_NoErr",
Expand All @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleBucketWithoutPoll_NoErr",
Expand All @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: nil,
},
{
name: "TwoBucketsWithPoll_NoErr",
Expand All @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "TwoBucketsWithoutPoll_NoErr",
Expand All @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: nil,
},
{
name: "SingleBucketWithPoll_InvalidBucketErr",
Expand All @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithoutPoll_InvalidBucketErr",
Expand All @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "TwoBucketsWithPoll_InvalidBucketErr",
Expand All @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithPoll_InvalidConfigValue",
Expand All @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "TwoBucketWithPoll_InvalidConfigValue",
Expand All @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "SingleBucketWithPoll_parseJSON",
Expand All @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json_parsed: true,
mock.Gcs_test_latest_object_data3_json_parsed: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSON",
Expand All @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_json[1]: true,
mock.BeatsFilesBucket_log_json[2]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadOctetStreamJSON",
Expand All @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json[0]: true,
mock.BeatsFilesBucket_multiline_json[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadNDJSON",
Expand All @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_ndjson[0]: true,
mock.BeatsFilesBucket_log_ndjson[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadMultilineGzJSON",
Expand All @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json_gz[0]: true,
mock.BeatsFilesBucket_multiline_json_gz[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSONWithRootAsArray",
Expand All @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_json_array[2]: true,
mock.BeatsFilesBucket_json_array[3]: true,
},
unexpectedError: context.Canceled,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) {
}
}
}
assert.ErrorIs(t, g.Wait(), tt.unexpectedError)
})
}
}
Expand Down
29 changes: 15 additions & 14 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -28,8 +27,6 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -109,13 +106,13 @@ func (j *job) do(ctx context.Context, id string) {
Fields: fields,
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
}

Expand Down Expand Up @@ -216,19 +213,23 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// updates the offset after reading the file
// this avoids duplicates for the last read when resuming operation
offset = dec.InputOffset()
// locks while data is being saved and published to avoid concurrent map read/writes
var (
done func()
cp *Checkpoint
)
if !dec.More() {
// if this is the last object, then peform a complete state save
j.state.save(j.object.Name, j.object.Updated)
cp, done = j.state.saveForTx(j.object.Name, j.object.Updated)
} else {
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
if err := j.publisher.Publish(evt, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
return nil
}
Expand Down
18 changes: 7 additions & 11 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,11 @@ func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *stora
// Schedule, is responsible for fetching & scheduling jobs using the workerpool model
func (s *scheduler) schedule() error {
if !s.src.Poll {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()
return s.scheduleOnce(ctxWithTimeout)
return s.scheduleOnce(s.parentCtx)
}

for {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()

err := s.scheduleOnce(ctxWithTimeout)
err := s.scheduleOnce(s.parentCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,9 +87,9 @@ func (l *limiter) release() {
l.wg.Done()
}

func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
func (s *scheduler) scheduleOnce(ctx context.Context) error {
defer s.limiter.wait()
pager := s.fetchObjectPager(ctxWithTimeout, s.src.MaxWorkers)
pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers)
for {
var objects []*storage.ObjectAttrs
nextPageToken, err := pager.NextPage(&objects)
Expand All @@ -107,7 +102,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
if !s.state.checkpoint().LatestEntryTime.IsZero() {
jobs = s.moveToLastSeenJob(jobs)
if len(s.state.checkpoint().FailedJobs) > 0 {
jobs = s.addFailedJobs(ctxWithTimeout, jobs)
jobs = s.addFailedJobs(ctx, jobs)
}
}

Expand All @@ -118,14 +113,15 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
s.limiter.acquire()
go func() {
defer s.limiter.release()
job.do(s.parentCtx, id)
job.do(ctx, id)
}()
}

if nextPageToken == "" {
break
}
}

return nil
}

Expand Down
26 changes: 16 additions & 10 deletions x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ func newState() *state {
}
}

// save, saves/updates the current state for cursor checkpoint
func (s *state) save(name string, lastModifiedOn time.Time) {
// saveForTx updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) {
s.mu.Lock()
delete(s.cp.LastProcessedOffset, name)
delete(s.cp.IsRootArray, name)
Expand All @@ -68,20 +71,23 @@ func (s *state) save(name string, lastModifiedOn time.Time) {
// clear entry if this is a failed job
delete(s.cp.FailedJobs, name)
}
s.mu.Unlock()
return s.cp, func() { s.mu.Unlock() }
}

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
// savePartialForTx partially updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
s.cp.LastProcessedOffset[name] = offset
return s.cp, func() { s.mu.Unlock() }
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

Expand Down

0 comments on commit 7788a6d

Please sign in to comment.