Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][gcs] Fix for concurrency issues and context timeouts in the GCS input #35605

Merged
merged 10 commits into from
Jun 3, 2023
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433]
- RFC5424 syslog timestamps with offset 'Z' will be treated as UTC rather than using the default timezone. {pull}35360[35360]
- [system] sync system/auth dataset with system integration 1.29.0. {pull}35581[35581]
- [GCS Input] - Fixed an issue where bucket_timeout was being applied to the entire bucket poll interval and not individual bucket object read operations. Fixed a map write concurrency issue arising from data races when using a high number of workers. Fixed the flaky tests that were present in the GCS test suit. {pull}35605[35605]
- Fix filestream false positive log error "filestream input with ID 'xyz' already exists" {issue}31767[31767]

*Heartbeat*
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down Expand Up @@ -127,7 +127,6 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,

log := inputCtx.Logger.With("project_id", currentSource.ProjectId).With("bucket", currentSource.BucketName)
log.Infof("Running google cloud storage for project: %s", input.config.ProjectId)

var cp *Checkpoint
if !cursor.IsNew() {
if err := cursor.Unpack(&cp); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
gax "github.com/googleapis/gax-go/v2"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
Expand Down
60 changes: 21 additions & 39 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ const (
)

func Test_StorageClient(t *testing.T) {
t.Skip("Flaky test: issue (could possibly affect this also) - https://github.com/elastic/beats/issues/34332")
tests := []struct {
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
unexpectedError error
name string
baseConfig map[string]interface{}
mockHandler func() http.Handler
expected map[string]bool
checkJSON bool
isError error
}{
{
name: "SingleBucketWithPoll_NoErr",
Expand All @@ -67,7 +65,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "SingleBucketWithoutPoll_NoErr",
Expand All @@ -89,7 +86,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
unexpectedError: nil,
},
{
name: "TwoBucketsWithPoll_NoErr",
Expand All @@ -116,7 +112,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: context.Canceled,
},
{
name: "TwoBucketsWithoutPoll_NoErr",
Expand All @@ -143,7 +138,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json: true,
mock.Gcs_test_latest_object_data3_json: true,
},
unexpectedError: nil,
},
{
name: "SingleBucketWithPoll_InvalidBucketErr",
Expand All @@ -159,10 +153,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithoutPoll_InvalidBucketErr",
Expand All @@ -178,10 +171,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "TwoBucketsWithPoll_InvalidBucketErr",
Expand All @@ -200,10 +192,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("storage: bucket doesn't exist"),
},
{
name: "SingleBucketWithPoll_InvalidConfigValue",
Expand All @@ -219,10 +210,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "TwoBucketWithPoll_InvalidConfigValue",
Expand All @@ -241,10 +231,9 @@ func Test_StorageClient(t *testing.T) {
},
},
},
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
unexpectedError: nil,
mockHandler: mock.GCSServer,
expected: map[string]bool{},
isError: errors.New("requires value <= 5000 accessing 'max_workers'"),
},
{
name: "SingleBucketWithPoll_parseJSON",
Expand All @@ -267,7 +256,6 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_latest_object_ata_json_parsed: true,
mock.Gcs_test_latest_object_data3_json_parsed: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSON",
Expand All @@ -289,7 +277,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_json[1]: true,
mock.BeatsFilesBucket_log_json[2]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadOctetStreamJSON",
Expand All @@ -310,7 +297,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json[0]: true,
mock.BeatsFilesBucket_multiline_json[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadNDJSON",
Expand All @@ -331,7 +317,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_log_ndjson[0]: true,
mock.BeatsFilesBucket_log_ndjson[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadMultilineGzJSON",
Expand All @@ -352,7 +337,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_multiline_json_gz[0]: true,
mock.BeatsFilesBucket_multiline_json_gz[1]: true,
},
unexpectedError: context.Canceled,
},
{
name: "ReadJSONWithRootAsArray",
Expand All @@ -375,7 +359,6 @@ func Test_StorageClient(t *testing.T) {
mock.BeatsFilesBucket_json_array[2]: true,
mock.BeatsFilesBucket_json_array[3]: true,
},
unexpectedError: context.Canceled,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -462,7 +445,6 @@ func Test_StorageClient(t *testing.T) {
}
}
}
assert.ErrorIs(t, g.Wait(), tt.unexpectedError)
})
}
}
Expand Down
29 changes: 15 additions & 14 deletions x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"
"unicode"

Expand All @@ -28,8 +27,6 @@ import (
)

type job struct {
// Mutex lock for concurrent publishes
mu sync.Mutex
// gcs bucket handle
bucket *storage.BucketHandle
// gcs object attribute struct
Expand Down Expand Up @@ -109,13 +106,13 @@ func (j *job) do(ctx context.Context, id string) {
Fields: fields,
}
event.SetID(objectID(j.hash, 0))
j.state.save(j.object.Name, j.object.Updated)
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(event, j.state.checkpoint()); err != nil {
// locks while data is being saved and published to avoid concurrent map read/writes
cp, done := j.state.saveForTx(j.object.Name, j.object.Updated)
if err := j.publisher.Publish(event, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
done()
}
}

Expand Down Expand Up @@ -216,19 +213,23 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er
// updates the offset after reading the file
// this avoids duplicates for the last read when resuming operation
offset = dec.InputOffset()
// locks while data is being saved and published to avoid concurrent map read/writes
var (
done func()
cp *Checkpoint
)
if !dec.More() {
// if this is the last object, then peform a complete state save
j.state.save(j.object.Name, j.object.Updated)
cp, done = j.state.saveForTx(j.object.Name, j.object.Updated)
} else {
// partially saves read state using offset
j.state.savePartial(j.object.Name, offset+relativeOffset)
cp, done = j.state.savePartialForTx(j.object.Name, offset+relativeOffset)
}
// locks while data is being published to avoid concurrent map read/writes
j.mu.Lock()
if err := j.publisher.Publish(evt, j.state.checkpoint()); err != nil {
if err := j.publisher.Publish(evt, cp); err != nil {
j.log.Errorw("job encountered an error", "gcs.jobId", id, "error", err)
}
j.mu.Unlock()
// unlocks after data is saved and published
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the assumptions that you are making about the published state of the event after Publish returns? I see this comment about it being "saved and published" and that's not correct. So if that is important to your implementation you'll need changes.

When Publish returns the event has been queued internally. It will have undergone processing via Beats processors then put onto the queue. If it needs to know when the event has been accepted by the destination output then you can register and EventListener to get a callback.

// Callbacks for when events are added / acknowledged
EventListener EventListener

As an example, the GCP pub/sub input uses an EventListener to defer ACK'ing the pubsub message until the event has been written to ES.

EventListener: acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewkroh thanks for the feedback, I will revisit this when I do some planned feature enhancements for the gcs input in the near future, will have to separate out the cursor publish from the event publish to properly implement this feature. For now I'm merging then PR since it's related to an sdh.

done()
}
return nil
}
Expand Down
18 changes: 7 additions & 11 deletions x-pack/filebeat/input/gcs/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,11 @@ func newScheduler(ctx context.Context, publisher cursor.Publisher, bucket *stora
// Schedule, is responsible for fetching & scheduling jobs using the workerpool model
func (s *scheduler) schedule() error {
if !s.src.Poll {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()
return s.scheduleOnce(ctxWithTimeout)
return s.scheduleOnce(s.parentCtx)
}

for {
ctxWithTimeout, cancel := context.WithTimeout(s.parentCtx, s.src.BucketTimeOut)
defer cancel()

err := s.scheduleOnce(ctxWithTimeout)
err := s.scheduleOnce(s.parentCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -92,9 +87,9 @@ func (l *limiter) release() {
l.wg.Done()
}

func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
func (s *scheduler) scheduleOnce(ctx context.Context) error {
defer s.limiter.wait()
pager := s.fetchObjectPager(ctxWithTimeout, s.src.MaxWorkers)
pager := s.fetchObjectPager(ctx, *s.cfg.MaxWorkers)
for {
var objects []*storage.ObjectAttrs
nextPageToken, err := pager.NextPage(&objects)
Expand All @@ -107,7 +102,7 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
if !s.state.checkpoint().LatestEntryTime.IsZero() {
jobs = s.moveToLastSeenJob(jobs)
if len(s.state.checkpoint().FailedJobs) > 0 {
jobs = s.addFailedJobs(ctxWithTimeout, jobs)
jobs = s.addFailedJobs(ctx, jobs)
}
}

Expand All @@ -118,14 +113,15 @@ func (s *scheduler) scheduleOnce(ctxWithTimeout context.Context) error {
s.limiter.acquire()
go func() {
defer s.limiter.release()
job.do(s.parentCtx, id)
job.do(ctx, id)
}()
}

if nextPageToken == "" {
break
}
}

return nil
}

Expand Down
26 changes: 16 additions & 10 deletions x-pack/filebeat/input/gcs/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ func newState() *state {
}
}

// save, saves/updates the current state for cursor checkpoint
func (s *state) save(name string, lastModifiedOn time.Time) {
// saveForTx updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) saveForTx(name string, lastModifiedOn time.Time) (cp *Checkpoint, done func()) {
s.mu.Lock()
delete(s.cp.LastProcessedOffset, name)
delete(s.cp.IsRootArray, name)
Expand All @@ -68,20 +71,23 @@ func (s *state) save(name string, lastModifiedOn time.Time) {
// clear entry if this is a failed job
delete(s.cp.FailedJobs, name)
}
s.mu.Unlock()
return s.cp, func() { s.mu.Unlock() }
}

// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
// savePartialForTx partially updates and returns the current state checkpoint, locks the state
// and returns an unlock function, done. The caller must call done when
// s and cp are no longer needed in a locked state. done may not be called
// more than once.
func (s *state) savePartialForTx(name string, offset int64) (cp *Checkpoint, done func()) {
s.mu.Lock()
s.cp.IsRootArray[name] = true
s.mu.Unlock()
s.cp.LastProcessedOffset[name] = offset
return s.cp, func() { s.mu.Unlock() }
}

// savePartial, partially saves/updates the current state for cursor checkpoint
func (s *state) savePartial(name string, offset int64) {
// setRootArray, sets boolean true for objects that have their roots defined as an array type
func (s *state) setRootArray(name string) {
s.mu.Lock()
s.cp.LastProcessedOffset[name] = offset
s.cp.IsRootArray[name] = true
s.mu.Unlock()
}

Expand Down