From 6a95dc44c8db6dec5f29cbd93020b646c918aa0d Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 29 May 2023 16:07:06 +0530 Subject: [PATCH 1/7] enhancement and bugfix for gcs input --- x-pack/filebeat/input/gcs/input.go | 5 +- x-pack/filebeat/input/gcs/input_stateless.go | 4 +- x-pack/filebeat/input/gcs/input_test.go | 60 +++++++------------- x-pack/filebeat/input/gcs/job.go | 16 ++++-- x-pack/filebeat/input/gcs/scheduler.go | 43 ++++++++------ x-pack/filebeat/input/gcs/state.go | 9 +++ 6 files changed, 71 insertions(+), 66 deletions(-) diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 9bc897f64fc..8ccb1878750 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName) log.Infof("Running google cloud storage for project: %s", input.config.ProjectId) - var cp *Checkpoint if !cursor.IsNew() { if err := cursor.Unpack(&cp); err != nil { @@ -156,7 +155,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, // Since we are only reading, the operation is always idempotent storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(ctx, publisher, bucket, currentSource, &input.config, st, log) + scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log) - return scheduler.schedule() + return scheduler.schedule(ctx) } diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index b6b3b14cda7..53f28a70ee4 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -79,9 +79,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(ctx, pub, bkt, currentSource, &in.config, st, log) + scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log) - return scheduler.schedule() + return scheduler.schedule(ctx) } return nil } diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 1cb658377ab..bd9028d6bf9 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -37,15 +37,13 @@ const ( ) func Test_StorageClient(t *testing.T) { - t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332") tests := []struct { - name string - baseConfig map[string]interface{} - mockHandler func() http.Handler - expected map[string]bool - checkJSON bool - isError error - unexpectedError error + name string + baseConfig map[string]interface{} + mockHandler func() http.Handler + expected map[string]bool + checkJSON bool + isError error }{ { name: "SingleBucketWithPoll_NoErr", @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_data3_json: true, mock.Gcs_test_new_object_docs_ata_json: true, }, - unexpectedError: context.Canceled, }, { name: "SingleBucketWithoutPoll_NoErr", @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_new_object_data3_json: true, mock.Gcs_test_new_object_docs_ata_json: true, }, - unexpectedError: nil, }, { name: "TwoBucketsWithPoll_NoErr", @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json: true, mock.Gcs_test_latest_object_data3_json: true, }, - unexpectedError: context.Canceled, }, { name: "TwoBucketsWithoutPoll_NoErr", @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json: true, mock.Gcs_test_latest_object_data3_json: true, }, - unexpectedError: nil, }, { name: "SingleBucketWithPoll_InvalidBucketErr", @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "SingleBucketWithoutPoll_InvalidBucketErr", @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "TwoBucketsWithPoll_InvalidBucketErr", @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("storage: bucket doesn't exist"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("storage: bucket doesn't exist"), }, { name: "SingleBucketWithPoll_InvalidConfigValue", @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "TwoBucketWithPoll_InvalidConfigValue", @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) { }, }, }, - mockHandler: mock.GCSServer, - expected: map[string]bool{}, - isError: errors.New("requires value <= 5000 accessing 'max_workers'"), - unexpectedError: nil, + mockHandler: mock.GCSServer, + expected: map[string]bool{}, + isError: errors.New("requires value <= 5000 accessing 'max_workers'"), }, { name: "SingleBucketWithPoll_parseJSON", @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) { mock.Gcs_test_latest_object_ata_json_parsed: true, mock.Gcs_test_latest_object_data3_json_parsed: true, }, - unexpectedError: context.Canceled, }, { name: "ReadJSON", @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_log_json[1]: true, mock.BeatsFilesBucket_log_json[2]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadOctetStreamJSON", @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_multiline_json[0]: true, mock.BeatsFilesBucket_multiline_json[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadNDJSON", @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_log_ndjson[0]: true, mock.BeatsFilesBucket_log_ndjson[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadMultilineGzJSON", @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_multiline_json_gz[0]: true, mock.BeatsFilesBucket_multiline_json_gz[1]: true, }, - unexpectedError: context.Canceled, }, { name: "ReadJSONWithRootAsArray", @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) { mock.BeatsFilesBucket_json_array[2]: true, mock.BeatsFilesBucket_json_array[3]: true, }, - unexpectedError: context.Canceled, }, } for _, tt := range tests { @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) { } } } - assert.ErrorIs(t, g.Wait(), tt.unexpectedError) }) } } diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 118e89287ac..0268af75912 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -109,12 +109,14 @@ func (j *job) do(ctx context.Context, id string) { Fields: fields, } event.SetID(objectID(j.hash, 0)) - j.state.save(j.object.Name, j.object.Updated) - // locks while data is being published to avoid concurrent map read/writes + // locks while data is being saved and published to avoid concurrent map read/writes j.mu.Lock() - if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil { + j.state.save(j.object.Name, j.object.Updated) + cp, done := j.state.checkpointTxn() + if err := j.publisher.Publish(event, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + done() j.mu.Unlock() } } @@ -216,6 +218,8 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // updates the offset after reading the file // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() + // locks while data is being saved and published to avoid concurrent map read/writes + j.mu.Lock() if !dec.More() { // if this is the last object, then peform a complete state save j.state.save(j.object.Name, j.object.Updated) @@ -223,11 +227,11 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // partially saves read state using offset j.state.savePartial(j.object.Name, offset+relativeOffset) } - // locks while data is being published to avoid concurrent map read/writes - j.mu.Lock() - if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil { + cp, done := j.state.checkpointTxn() + if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + done() j.mu.Unlock() } return nil diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 7feb57f7c1e..4b5a5662f8b 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -26,7 +26,6 @@ type limiter struct { limit chan struct{} } type scheduler struct { - parentCtx context.Context publisher cursor.Publisher bucket *storage.BucketHandle src *Source @@ -37,11 +36,10 @@ type scheduler struct { } // newScheduler, returns a new scheduler instance -func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *storage.BucketHandle, src *Source, cfg *config, +func newScheduler(publisher cursor.Publisher, bucket *storage.BucketHandle, src *Source, cfg *config, state *state, log *logp.Logger, ) *scheduler { return &scheduler{ - parentCtx: ctx, publisher: publisher, bucket: bucket, src: src, @@ -53,23 +51,18 @@ func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *stora } // Schedule, is responsible for fetching & scheduling jobs using the workerpool model -func (s *scheduler) schedule() error { +func (s *scheduler) schedule(ctx context.Context) error { if !s.src.Poll { - ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut) - defer cancel() - return s.scheduleOnce(ctxWithTimeout) + return s.scheduleOnce(ctx) } for { - ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut) - defer cancel() - - err := s.scheduleOnce(ctxWithTimeout) + err := s.scheduleOnce(ctx) if err != nil { return err } - err = timed.Wait(s.parentCtx, s.src.PollInterval) + err = timed.Wait(ctx, s.src.PollInterval) if err != nil { return err } @@ -92,40 +85,54 @@ func (l *limiter) release() { l.wg.Done() } -func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error { +func (s *scheduler) scheduleOnce(ctx context.Context) error { defer s.limiter.wait() - pager := s.fetchObjectPager(ctxWithTimeout, s.src.MaxWorkers) + pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers) + numObs := 0 + numJobs := 0 for { var objects []*storage.ObjectAttrs nextPageToken, err := pager.NextPage(&objects) if err != nil { return err } + numObs += len(objects) jobs := s.createJobs(objects, s.log) + s.log.Debugf("scheduler: %d objects fetched for current batch", len(objects)) // If previous checkpoint was saved then look up starting point for new jobs if !s.state.checkpoint().LatestEntryTime.IsZero() { jobs = s.moveToLastSeenJob(jobs) if len(s.state.checkpoint().FailedJobs) > 0 { - jobs = s.addFailedJobs(ctxWithTimeout, jobs) + jobs = s.addFailedJobs(ctx, jobs) } } + s.log.Debugf("scheduler: %d jobs scheduled for current batch", len(jobs)) // distributes jobs among workers with the help of a limiter for i, job := range jobs { + numJobs++ id := fetchJobID(i, s.src.BucketName, job.Name()) job := job s.limiter.acquire() go func() { defer s.limiter.release() - job.do(s.parentCtx, id) + job.do(ctx, id) }() } + s.log.Debugf("scheduler: total objects read till now: %d", numObs) + s.log.Debugf("scheduler: total jobs scheduled till now: %d", numJobs) + if len(jobs) > 0 { + s.log.Debugf("scheduler: first job in current batch: %s", jobs[0].Name()) + s.log.Debugf("scheduler: last job in current batch: %s", jobs[len(jobs)-1].Name()) + } + if nextPageToken == "" { break } } + return nil } @@ -216,6 +223,8 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { jobMap[j.Name()] = true } + s.log.Debugf("scheduler: %d failed jobs found", len(s.state.checkpoint().FailedJobs)) + fj := 0 for name := range s.state.checkpoint().FailedJobs { if !jobMap[name] { obj, err := s.bucket.Object(name).Attrs(ctx) @@ -226,6 +235,8 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { objectURI := "gs://" + s.src.BucketName + "/" + obj.Name job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, s.log, true) jobs = append(jobs, job) + s.log.Debugf("scheduler: adding failed job number %d with name %s to job current list", fj, job.Name()) + fj++ } } return jobs diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 6b2a269481f..31eac385699 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -123,3 +123,12 @@ func (s *state) setCheckpoint(chkpt *Checkpoint) { func (s *state) checkpoint() *Checkpoint { return s.cp } + +// checkpointTxn, returns the current state checkpoint, locks the state +// and returns an unlock function, done. The caller must call done when +// cp is no longer needed in a locked state. done may not be called +// more than once. +func (s *state) checkpointTxn() (cp *Checkpoint, done func()) { + s.mu.Lock() + return s.cp, func() { s.mu.Unlock() } +} From 95a36c4652da3e57d8ff385f638d2631f16a16d4 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 29 May 2023 16:22:19 +0530 Subject: [PATCH 2/7] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b061eb26315..05333115649 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] - RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360] - [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581] +- [Gcs Input] - Enhancement and bugfix for concurrency issues, flakey tests and context timing out. {pull}35605[35605] *Heartbeat* From f1743088cef9c56356b716a2cbda0e9ff5dde6a9 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 29 May 2023 17:51:54 +0530 Subject: [PATCH 3/7] addressed go lint errors --- x-pack/filebeat/input/gcs/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 8ccb1878750..81ed1b82210 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -9,7 +9,7 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/googleapis/gax-go/v2" + gax "github.com/googleapis/gax-go/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" From a03eeaede2c9823fbbf8419ab54b90ad6ab5e271 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 30 May 2023 11:08:04 +0530 Subject: [PATCH 4/7] updated changelog and addresses PR suggestions --- CHANGELOG.next.asciidoc | 2 +- x-pack/filebeat/input/gcs/input_stateless.go | 2 +- x-pack/filebeat/input/gcs/job.go | 19 ++++------- x-pack/filebeat/input/gcs/state.go | 35 +++++++++----------- 4 files changed, 25 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 05333115649..8881d9f90b4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -124,7 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] - RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360] - [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581] -- [Gcs Input] - Enhancement and bugfix for concurrency issues, flakey tests and context timing out. {pull}35605[35605] +- [GCS Input] - Enhancement and bugfix for concurrency issues, flakey tests and context timing out. {pull}35605[35605] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index 53f28a70ee4..d31f0875262 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -9,7 +9,7 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/googleapis/gax-go/v2" + gax "github.com/googleapis/gax-go/v2" v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 0268af75912..347f2dee16a 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -15,7 +15,6 @@ import ( "errors" "fmt" "io" - "sync" "time" "unicode" @@ -28,8 +27,6 @@ import ( ) type job struct { - // Mutex lock for concurrent publishes - mu sync.Mutex // gcs bucket handle bucket *storage.BucketHandle // gcs object attribute struct @@ -110,14 +107,12 @@ func (j *job) do(ctx context.Context, id string) { } event.SetID(objectID(j.hash, 0)) // locks while data is being saved and published to avoid concurrent map read/writes - j.mu.Lock() - j.state.save(j.object.Name, j.object.Updated) - cp, done := j.state.checkpointTxn() + cp, done := j.state.saveForTx(j.object.Name, j.object.Updated) if err := j.publisher.Publish(event, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + // unlocks after data is saved and published done() - j.mu.Unlock() } } @@ -219,20 +214,20 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() // locks while data is being saved and published to avoid concurrent map read/writes - j.mu.Lock() + var done func() + var cp *Checkpoint if !dec.More() { // if this is the last object, then peform a complete state save - j.state.save(j.object.Name, j.object.Updated) + cp, done = j.state.saveForTx(j.object.Name, j.object.Updated) } else { // partially saves read state using offset - j.state.savePartial(j.object.Name, offset+relativeOffset) + cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset) } - cp, done := j.state.checkpointTxn() if err := j.publisher.Publish(evt, cp); err != nil { j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err) } + // unlocks after data is saved and published done() - j.mu.Unlock() } return nil } diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 31eac385699..afa20e5d52d 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -47,8 +47,11 @@ func newState() *state { } } -// save, saves/updates the current state for cursor checkpoint -func (s *state) save(name string, lastModifiedOn time.Time) { +// saveForTx updates and returns the current state checkpoint, locks the state +// and returns an unlock function, done. The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) { s.mu.Lock() delete(s.cp.LastProcessedOffset, name) delete(s.cp.IsRootArray, name) @@ -68,20 +71,23 @@ func (s *state) save(name string, lastModifiedOn time.Time) { // clear entry if this is a failed job delete(s.cp.FailedJobs, name) } - s.mu.Unlock() + return s.cp, func() { s.mu.Unlock() } } -// setRootArray, sets boolean true for objects that have their roots defined as an array type -func (s *state) setRootArray(name string) { +// savePartialForTx partially updates and returns the current state checkpoint, locks the state +// and returns an unlock function, done. The caller must call done when +// s and cp are no longer needed in a locked state. done may not be called +// more than once. +func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) { s.mu.Lock() - s.cp.IsRootArray[name] = true - s.mu.Unlock() + s.cp.LastProcessedOffset[name] = offset + return s.cp, func() { s.mu.Unlock() } } -// savePartial, partially saves/updates the current state for cursor checkpoint -func (s *state) savePartial(name string, offset int64) { +// setRootArray, sets boolean true for objects that have their roots defined as an array type +func (s *state) setRootArray(name string) { s.mu.Lock() - s.cp.LastProcessedOffset[name] = offset + s.cp.IsRootArray[name] = true s.mu.Unlock() } @@ -123,12 +129,3 @@ func (s *state) setCheckpoint(chkpt *Checkpoint) { func (s *state) checkpoint() *Checkpoint { return s.cp } - -// checkpointTxn, returns the current state checkpoint, locks the state -// and returns an unlock function, done. The caller must call done when -// cp is no longer needed in a locked state. done may not be called -// more than once. -func (s *state) checkpointTxn() (cp *Checkpoint, done func()) { - s.mu.Lock() - return s.cp, func() { s.mu.Unlock() } -} From a7f1e11984a55f013c2300c49e2cb5a23fdddb8b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 30 May 2023 13:27:37 +0530 Subject: [PATCH 5/7] addressed PR suggestions --- CHANGELOG.next.asciidoc | 6 +++++- x-pack/filebeat/input/gcs/job.go | 6 ++++-- x-pack/filebeat/input/gcs/scheduler.go | 11 +++++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8881d9f90b4..d63143acbdf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -124,7 +124,11 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] - RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360] - [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581] -- [GCS Input] - Enhancement and bugfix for concurrency issues, flakey tests and context timing out. {pull}35605[35605] +- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual + bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. + Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] +- [GCS Input] - Enhanced the input by adding debug logs and improved the context passing mechanism by removing them from struct params + and passing them as function arguments. {pull}35605[35605] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/job.go b/x-pack/filebeat/input/gcs/job.go index 347f2dee16a..edcb7fe976a 100644 --- a/x-pack/filebeat/input/gcs/job.go +++ b/x-pack/filebeat/input/gcs/job.go @@ -214,8 +214,10 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er // this avoids duplicates for the last read when resuming operation offset = dec.InputOffset() // locks while data is being saved and published to avoid concurrent map read/writes - var done func() - var cp *Checkpoint + var ( + done func() + cp *Checkpoint + ) if !dec.More() { // if this is the last object, then peform a complete state save cp, done = j.state.saveForTx(j.object.Name, j.object.Updated) diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 4b5a5662f8b..6ea5f8badf6 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -121,11 +121,9 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { }() } - s.log.Debugf("scheduler: total objects read till now: %d", numObs) - s.log.Debugf("scheduler: total jobs scheduled till now: %d", numJobs) + s.log.Debugf("scheduler: total objects read till now: %d\nscheduler: total jobs scheduled till now: %d", numObs, numJobs) if len(jobs) > 0 { - s.log.Debugf("scheduler: first job in current batch: %s", jobs[0].Name()) - s.log.Debugf("scheduler: last job in current batch: %s", jobs[len(jobs)-1].Name()) + s.log.Debugf("scheduler: first job in current batch: %s\nscheduler: last job in current batch: %s", jobs[0].Name(), jobs[len(jobs)-1].Name()) } if nextPageToken == "" { @@ -223,9 +221,10 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { jobMap[j.Name()] = true } - s.log.Debugf("scheduler: %d failed jobs found", len(s.state.checkpoint().FailedJobs)) + failedJobs := s.state.checkpoint().FailedJobs + s.log.Debugf("scheduler: %d failed jobs found", len(failedJobs)) fj := 0 - for name := range s.state.checkpoint().FailedJobs { + for name := range failedJobs { if !jobMap[name] { obj, err := s.bucket.Object(name).Attrs(ctx) if err != nil { From af2b1a08a4f5fcf78f1287725f85b5c873be93dd Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 31 May 2023 12:55:35 +0530 Subject: [PATCH 6/7] updated PR to contain only bugfixes --- CHANGELOG.next.asciidoc | 2 -- x-pack/filebeat/input/gcs/input.go | 4 +-- x-pack/filebeat/input/gcs/input_stateless.go | 4 +-- x-pack/filebeat/input/gcs/scheduler.go | 30 ++++++-------------- 4 files changed, 12 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d63143acbdf..87fde8de9fa 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,8 +127,6 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] -- [GCS Input] - Enhanced the input by adding debug logs and improved the context passing mechanism by removing them from struct params - and passing them as function arguments. {pull}35605[35605] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 81ed1b82210..b5e4c23b8b2 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -155,7 +155,7 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, // Since we are only reading, the operation is always idempotent storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(publisher, bucket, currentSource, &input.config, st, log) + scheduler := newScheduler(ctx, publisher, bucket, currentSource, &input.config, st, log) - return scheduler.schedule(ctx) + return scheduler.schedule() } diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index d31f0875262..b81ccf79b12 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -79,9 +79,9 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, log) + scheduler := newScheduler(ctx, pub, bkt, currentSource, &in.config, st, log) - return scheduler.schedule(ctx) + return scheduler.schedule() } return nil } diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 6ea5f8badf6..a5da0b9576d 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -26,6 +26,7 @@ type limiter struct { limit chan struct{} } type scheduler struct { + parentCtx context.Context publisher cursor.Publisher bucket *storage.BucketHandle src *Source @@ -36,10 +37,11 @@ type scheduler struct { } // newScheduler, returns a new scheduler instance -func newScheduler(publisher cursor.Publisher, bucket *storage.BucketHandle, src *Source, cfg *config, +func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *storage.BucketHandle, src *Source, cfg *config, state *state, log *logp.Logger, ) *scheduler { return &scheduler{ + parentCtx: ctx, publisher: publisher, bucket: bucket, src: src, @@ -51,18 +53,18 @@ func newScheduler(publisher cursor.Publisher, bucket *storage.BucketHandle, src } // Schedule, is responsible for fetching & scheduling jobs using the workerpool model -func (s *scheduler) schedule(ctx context.Context) error { +func (s *scheduler) schedule() error { if !s.src.Poll { - return s.scheduleOnce(ctx) + return s.scheduleOnce(s.parentCtx) } for { - err := s.scheduleOnce(ctx) + err := s.scheduleOnce(s.parentCtx) if err != nil { return err } - err = timed.Wait(ctx, s.src.PollInterval) + err = timed.Wait(s.parentCtx, s.src.PollInterval) if err != nil { return err } @@ -88,17 +90,13 @@ func (l *limiter) release() { func (s *scheduler) scheduleOnce(ctx context.Context) error { defer s.limiter.wait() pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers) - numObs := 0 - numJobs := 0 for { var objects []*storage.ObjectAttrs nextPageToken, err := pager.NextPage(&objects) if err != nil { return err } - numObs += len(objects) jobs := s.createJobs(objects, s.log) - s.log.Debugf("scheduler: %d objects fetched for current batch", len(objects)) // If previous checkpoint was saved then look up starting point for new jobs if !s.state.checkpoint().LatestEntryTime.IsZero() { @@ -107,11 +105,9 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { jobs = s.addFailedJobs(ctx, jobs) } } - s.log.Debugf("scheduler: %d jobs scheduled for current batch", len(jobs)) // distributes jobs among workers with the help of a limiter for i, job := range jobs { - numJobs++ id := fetchJobID(i, s.src.BucketName, job.Name()) job := job s.limiter.acquire() @@ -121,11 +117,6 @@ func (s *scheduler) scheduleOnce(ctx context.Context) error { }() } - s.log.Debugf("scheduler: total objects read till now: %d\nscheduler: total jobs scheduled till now: %d", numObs, numJobs) - if len(jobs) > 0 { - s.log.Debugf("scheduler: first job in current batch: %s\nscheduler: last job in current batch: %s", jobs[0].Name(), jobs[len(jobs)-1].Name()) - } - if nextPageToken == "" { break } @@ -221,10 +212,7 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { jobMap[j.Name()] = true } - failedJobs := s.state.checkpoint().FailedJobs - s.log.Debugf("scheduler: %d failed jobs found", len(failedJobs)) - fj := 0 - for name := range failedJobs { + for name := range s.state.checkpoint().FailedJobs { if !jobMap[name] { obj, err := s.bucket.Object(name).Attrs(ctx) if err != nil { @@ -234,8 +222,6 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { objectURI := "gs://" + s.src.BucketName + "/" + obj.Name job := newJob(s.bucket, obj, objectURI, s.state, s.src, s.publisher, s.log, true) jobs = append(jobs, job) - s.log.Debugf("scheduler: adding failed job number %d with name %s to job current list", fj, job.Name()) - fj++ } } return jobs From cdf54ca55e3e21f62fac03b8644070f036ecaf5f Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 1 Jun 2023 13:43:05 +0530 Subject: [PATCH 7/7] updated changelog --- CHANGELOG.next.asciidoc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3e58f7b8d54..ec2365a1b8e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -124,9 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] - RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360] - [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581] -- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual - bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. - Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] +- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605] - Fix filestream false positive log error "filestream input with ID 'xyz' already exists" {issue}31767[31767] *Heartbeat*