Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix awscloudwatch worker allocation #38953

Merged
merged 6 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953]
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985]
- entity-analytics input: Improve structured logging. {pull}38990[38990]
- Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962]
Expand Down
134 changes: 104 additions & 30 deletions x-pack/filebeat/input/awscloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,69 @@ import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"

awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
"github.com/elastic/elastic-agent-libs/logp"
)

type cloudwatchPoller struct {
numberOfWorkers int
apiSleep time.Duration
config config
region string
logStreams []*string
logStreamPrefix string
startTime int64
endTime int64
workerSem *awscommon.Sem
log *logp.Logger
metrics *inputMetrics
workersListingMap *sync.Map
workersProcessingMap *sync.Map

// When a worker is ready for its next task, it should
// send to workRequestChan and then read from workResponseChan.
// The worker can cancel the request based on other context
// cancellations, but if the write succeeds it _must_ read from
// workResponseChan to avoid deadlocking the main loop.
workRequestChan chan struct{}
workResponseChan chan workResponse

workerWg sync.WaitGroup
}

type workResponse struct {
logGroup string
startTime, endTime time.Time
}

func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
awsRegion string, apiSleep time.Duration,
numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller {
awsRegion string, config config) *cloudwatchPoller {
if metrics == nil {
metrics = newInputMetrics("", nil)
}

return &cloudwatchPoller{
numberOfWorkers: numberOfWorkers,
apiSleep: apiSleep,
region: awsRegion,
logStreams: logStreams,
logStreamPrefix: logStreamPrefix,
startTime: int64(0),
endTime: int64(0),
workerSem: awscommon.NewSem(numberOfWorkers),
log: log,
metrics: metrics,
region: awsRegion,
config: config,
workersListingMap: new(sync.Map),
workersProcessingMap: new(sync.Map),
// workRequestChan is unbuffered to guarantee that
// the worker and main loop agree whether a request
// was sent. workerResponseChan is buffered so the
// main loop doesn't have to block on the workers
// while distributing new data.
workRequestChan: make(chan struct{}),
workResponseChan: make(chan workResponse, 10),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 10 chosen for a reason? Or just good practice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The choice to buffer it is because it lets the polling loop send multiple responses in one scheduler interval, but the choice of 10 is just an arbitrary small number. (Heuristically I think of it, buffer of 10 = ~90% less contention in the main loop than it would have had if it was synchronous. And I don't know exactly how much contention the synchronous case is, but surely eliminating 90% of it will avoid any bottleneck.)

}
}

func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
if err != nil {
var errRequestCanceled *awssdk.RequestCanceledError
if errors.As(err, &errRequestCanceled) {
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled)
}
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
}
}

// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
// construct FilterLogEventsInput
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
Expand All @@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))

// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
time.Sleep(p.apiSleep)
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep)
time.Sleep(p.config.APISleep)
p.log.Debug("done sleeping")

p.log.Debugf("Processing #%v events", len(logEvents))
Expand All @@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
return nil
}

func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
LogGroupName: awssdk.String(logGroup),
StartTime: awssdk.Int64(startTime),
EndTime: awssdk.Int64(endTime),
StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)),
EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)),
}

if len(p.logStreams) > 0 {
for _, stream := range p.logStreams {
if len(p.config.LogStreams) > 0 {
for _, stream := range p.config.LogStreams {
filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream)
}
}

if p.logStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
if p.config.LogStreamPrefix != "" {
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix)
}
return filterLogEventsInput
}

func (p *cloudwatchPoller) startWorkers(
ctx context.Context,
svc *cloudwatchlogs.Client,
logProcessor *logProcessor,
) {
for i := 0; i < p.config.NumberOfWorkers; i++ {
p.workerWg.Add(1)
go func() {
defer p.workerWg.Done()
for {
var work workResponse
select {
case <-ctx.Done():
return
case p.workRequestChan <- struct{}{}:
work = <-p.workResponseChan
}

p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
}
}()
}
}

// receive implements the main run loop that distributes tasks to the worker
// goroutines. It accepts a "clock" callback (which on a live input should
// equal time.Now) to allow deterministic unit tests.
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
defer p.workerWg.Wait()
// startTime and endTime are the bounds of the current scanning interval.
// If we're starting at the end of the logs, advance the start time to the
// most recent scan window
var startTime time.Time
endTime := clock().Add(-p.config.Latency)
if p.config.StartPosition == "end" {
startTime = endTime.Add(-p.config.ScanFrequency)
}
for ctx.Err() == nil {
for _, lg := range logGroupNames {
select {
case <-ctx.Done():
return
case <-p.workRequestChan:
p.workResponseChan <- workResponse{
logGroup: lg,
startTime: startTime,
endTime: endTime,
}
}
}

// Delay for ScanFrequency after finishing a time span
p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency)
select {
case <-time.After(p.config.ScanFrequency):
case <-ctx.Done():
}
p.log.Debug("done sleeping")

// Advance to the next time span
startTime, endTime = endTime, clock().Add(-p.config.Latency)
}
}
Loading
Loading