Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asynchronous ACK handling to S3 and SQS inputs #40699

Merged
merged 41 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e9ccebe
working on S3 ack tracking
faec May 21, 2024
b17f7b5
handle S3 acks asynchronously
faec May 22, 2024
d023027
fix loop variable scope
faec May 22, 2024
aedb5a2
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec May 31, 2024
ed4953a
remove leftover print
faec May 31, 2024
12a39b4
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Jun 6, 2024
7e70ea6
cleanup
faec Jun 6, 2024
ff4f57c
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Jun 20, 2024
2d18b28
restore struct field
faec Jun 24, 2024
6bc5e42
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Jul 11, 2024
b4cc30e
updates in progress
faec Aug 1, 2024
d4ecc1f
leftover file
faec Aug 1, 2024
64a96d4
use a common ack helper in s3 and sqs
faec Aug 6, 2024
7a20250
in progress
faec Aug 7, 2024
977e4c5
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Aug 23, 2024
5282671
test updates
faec Aug 30, 2024
1548ac2
fix/update error SQS error handling logic
faec Sep 5, 2024
2358241
close ack handler in s3 worker
faec Sep 5, 2024
8c447bc
Pass through all processing parameters
faec Oct 1, 2024
2deefa7
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Oct 1, 2024
8cde326
regenerate mock interfaces
faec Oct 3, 2024
c2131e1
update mock calls
faec Oct 3, 2024
1a07561
clean up dead code
faec Oct 3, 2024
c99b94d
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Oct 3, 2024
da1c5ed
Merge branch 'main' into awss3-ack-handling
pierrehilbert Oct 4, 2024
70821fa
Merge branch 'main' into awss3-ack-handling
pierrehilbert Oct 4, 2024
5977072
finish config/doc updates, address review comments
faec Oct 14, 2024
6fe1183
remove helper that was replaced with shared lib
faec Oct 14, 2024
ba6ecfe
make check
faec Oct 14, 2024
f1732f8
re-correct doc yml
faec Oct 14, 2024
83ba548
make check again
faec Oct 14, 2024
577759c
update changelog
faec Oct 14, 2024
7ba1374
Merge branch 'awss3-ack-handling' of github.com:faec/beats into awss3…
faec Oct 14, 2024
cf90022
Merge branch 'main' of github.com:elastic/beats into awss3-ack-handling
faec Oct 14, 2024
a743c08
make update
faec Oct 14, 2024
48025b5
linter / CI fixes
faec Oct 15, 2024
10ecfd3
Fix benchmark test ack counting
faec Oct 15, 2024
7a38f55
edit changelog
faec Oct 15, 2024
84cb422
more mock test fixes
faec Oct 15, 2024
b088ca7
fix more mock details
faec Oct 15, 2024
bbbd1f9
fix race condition in mock sequencing
faec Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions libbeat/common/fifo/fifo.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test file as well? Example test would suffice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fifo

// FIFO is a minimal first-in first-out queue based on a singly linked list.
type FIFO[T any] struct {
first *node[T]
last *node[T]
}

type node[T any] struct {
next *node[T]
value T
}

func (f *FIFO[T]) Add(value T) {
newNode := &node[T]{value: value}
if f.first == nil {
f.first = newNode
} else {
f.last.next = newNode
}
f.last = newNode
}

func (f *FIFO[T]) Empty() bool {
return f.first == nil
}

// Return the first value (if present) without removing it from the queue.
// If the queue is empty a default value is returned, to detect this case
// use f.Empty().
func (f *FIFO[T]) First() T {
if f.first == nil {
var none T
return none
}
return f.first.value
}

// Return the first value (if present) and remove it from the queue.
// If the queue is empty a default value is returned, to detect this case
// use f.Empty().
func (f *FIFO[T]) ConsumeFirst() T {
result := f.First()
f.Remove()
return result
}

// Remove the first entry in the queue. Does nothing if the FIFO is empty.
func (f *FIFO[T]) Remove() {
if f.first != nil {
f.first = f.first.next
if f.first == nil {
f.last = nil
}
}
}
86 changes: 86 additions & 0 deletions x-pack/filebeat/input/awss3/acks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/fifo"
)

type awsACKHandler struct {
pending fifo.FIFO[pendingACK]
ackedCount int

pendingChan chan pendingACK
ackChan chan int
}

type pendingACK struct {
eventCount int
ackCallback func()
}

func newAWSACKHandler() *awsACKHandler {
handler := &awsACKHandler{
pendingChan: make(chan pendingACK, 10),
ackChan: make(chan int, 10),
Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a minimum I think we need a comment on why 10. But I've got a couple of questions. Do these always need to be in sync? Would we ever want to change these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, documenting why we picked up 10, and if they need to stay in sync would be useful. Do we ever need to make this configurable?

}
go handler.run()
return handler
}

func (ah *awsACKHandler) Add(eventCount int, ackCallback func()) {
ah.pendingChan <- pendingACK{
eventCount: eventCount,
ackCallback: ackCallback,
}
}

// Called when a worker is closing, to indicate to the ack handler that it
// should shut down as soon as the current pending list is acknowledged.
func (ah *awsACKHandler) Close() {
close(ah.pendingChan)
}

func (ah *awsACKHandler) pipelineEventListener() beat.EventListener {
return acker.TrackingCounter(func(_ int, total int) {
// Notify the ack handler goroutine
ah.ackChan <- total
})
}

// Listener that handles both incoming metadata and ACK
// confirmations.
func (ah *awsACKHandler) run() {
for {
select {
case result, ok := <-ah.pendingChan:
if ok {
ah.pending.Add(result)
} else {
// Channel is closed, reset so we don't receive any more values
ah.pendingChan = nil
}
case count := <-ah.ackChan:
ah.ackedCount += count
}

// Finalize any objects that are now completed
for !ah.pending.Empty() && ah.ackedCount >= ah.pending.First().eventCount {
result := ah.pending.ConsumeFirst()
ah.ackedCount -= result.eventCount
// Run finalization asynchronously so we don't block the SQS worker
// or the queue by ignoring the ack handler's input channels. Ordering
// is no longer important at this point.
go result.ackCallback()
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
}

// If the input is closed and all acks are completed, we're done
if ah.pending.Empty() && ah.pendingChan == nil {
return
}
}
}
8 changes: 4 additions & 4 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ file_selectors:
func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkResult {
return testing.Benchmark(func(b *testing.B) {
var err error
pipeline := &fakePipeline{}

config := makeBenchmarkConfig(t)
config.MaxNumberOfMessages = maxMessagesInflight
Expand All @@ -221,7 +220,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
sqsReader.sqs, err = newConstantSQS()
require.NoError(t, err)
sqsReader.s3 = newConstantS3(t)
sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline)
sqsReader.msgHandler, err = sqsReader.createEventProcessor()
require.NoError(t, err, "createEventProcessor must succeed")

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -307,6 +306,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
})
pipeline := pubtest.PublisherWithClient(client)

defer func() {
_ = client.Close()
Expand Down Expand Up @@ -342,13 +342,13 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
states, err := newStates(nil, store)
assert.NoError(t, err, "states creation should succeed")

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{})
s3EventHandlerFactory := newS3ObjectProcessorFactory(metrics, s3API, config.FileSelectors, backupConfig{})
s3Poller := &s3PollerInput{
log: logp.NewLogger(inputName),
config: config,
metrics: metrics,
s3: s3API,
client: client,
pipeline: pipeline,
s3ObjectHandler: s3EventHandlerFactory,
states: states,
provider: "provider",
Expand Down
34 changes: 5 additions & 29 deletions x-pack/filebeat/input/awss3/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/aws/smithy-go/middleware"

"github.com/elastic/beats/v7/libbeat/beat"
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
Expand All @@ -41,25 +40,9 @@ import (
const s3RequestURLMetadataKey = `x-beat-s3-request-url`

type sqsAPI interface {
sqsReceiver
sqsDeleter
sqsVisibilityChanger
sqsAttributeGetter
}

type sqsReceiver interface {
ReceiveMessage(ctx context.Context, maxMessages int) ([]types.Message, error)
}

type sqsDeleter interface {
DeleteMessage(ctx context.Context, msg *types.Message) error
}

type sqsVisibilityChanger interface {
ChangeMessageVisibility(ctx context.Context, msg *types.Message, timeout time.Duration) error
}

type sqsAttributeGetter interface {
GetQueueAttributes(ctx context.Context, attr []types.QueueAttributeName) (map[string]string, error)
}

Expand All @@ -68,7 +51,7 @@ type sqsProcessor interface {
// given message and is responsible for updating the message's visibility
// timeout while it is being processed and for deleting it when processing
// completes successfully.
ProcessSQS(ctx context.Context, msg *types.Message) error
ProcessSQS(ctx context.Context, msg *types.Message, eventCallback func(e beat.Event)) sqsProcessingResult
}

// ------
Expand Down Expand Up @@ -103,25 +86,18 @@ type s3ObjectHandlerFactory interface {
// Create returns a new s3ObjectHandler that can be used to process the
// specified S3 object. If the handler is not configured to process the
// given S3 object (based on key name) then it will return nil.
Create(ctx context.Context, log *logp.Logger, client beat.Client, acker *awscommon.EventACKTracker, obj s3EventV2) s3ObjectHandler
Create(ctx context.Context, obj s3EventV2) s3ObjectHandler
}

type s3ObjectHandler interface {
// ProcessS3Object downloads the S3 object, parses it, creates events, and
// publishes them. It returns when processing finishes or when it encounters
// an unrecoverable error. It does not wait for the events to be ACKed by
// the publisher before returning (use eventACKTracker's Wait() method to
// determine this).
ProcessS3Object() error
// passes to the given callback. It returns when processing finishes or
// when it encounters an unrecoverable error.
ProcessS3Object(log *logp.Logger, eventCallback func(e beat.Event)) error

// FinalizeS3Object finalizes processing of an S3 object after the current
// batch is finished.
FinalizeS3Object() error

// Wait waits for every event published by ProcessS3Object() to be ACKed
// by the publisher before returning. Internally it uses the
// s3ObjectHandler eventACKTracker's Wait() method
Wait()
}

// ------
Expand Down
Loading
Loading