Skip to content

Commit

Permalink
x-pack/filebeat/input/awss3 ; Fix nil hit panic when a getter is invo…
Browse files Browse the repository at this point in the history
…ked on input metric (#36101)

(cherry picked from commit cc740f9)
  • Loading branch information
bhapas authored and mergify[bot] committed Jul 24, 2023
1 parent 715acb9 commit 79c3e14
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027]
- Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008]
- Fix handling of region name configuration in awss3 input {pull}36034[36034]
- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]

*Heartbeat*
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ observe the activity of the input.
| `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge).
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html<GetQueueAttributes>. A value of `-1` indicates the metric is uninitialized or could not be collected due to an error.
| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
Expand Down
38 changes: 27 additions & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,28 +409,44 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
}

func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {
// Run GetApproximateMessageCount before start of timer to set initial count for sqs waiting metric
// This is to avoid misleading values in metric when sqs messages are processed before the ticker channel kicks in
if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn {
return
}

t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
receiver.metrics.setSQSMessagesWaiting(int64(count))
return
}
if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn {
return
}
}
}
}

receiver.metrics.setSQSMessagesWaiting(int64(count))
// updateMessageCount runs GetApproximateMessageCount for the given context and updates the receiver metric with the count returning false on no error
// If there is an error, the metric is reinitialized to -1 and true is returned
func updateMessageCount(receiver *sqsReader, ctx context.Context) bool {
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
// Set it back to -1 because there is a permission error
receiver.metrics.sqsMessagesWaiting.Set(int64(-1))
return true
}
}

receiver.metrics.sqsMessagesWaiting.Set(int64(count))
return false
}

// boolPtr returns a pointer to b.
Expand Down
6 changes: 2 additions & 4 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,11 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
//assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
//assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -426,10 +425,9 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
//assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
//assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}
16 changes: 4 additions & 12 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,6 @@ func (m *inputMetrics) Close() {
m.unregister()
}

func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
if m.sqsMessagesWaiting == nil {
// if metric not initialized, and count is -1, do nothing
if count == -1 {
return
}
m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge")
}

m.sqsMessagesWaiting.Set(count)
}

// beginSQSWorker tracks the start of a new SQS worker. The returned ID
// must be used to call endSQSWorker when the worker finishes. It also
// increments the sqsMessagesInflight counter.
Expand Down Expand Up @@ -174,6 +162,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"),
sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
Expand All @@ -186,6 +175,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"),
s3ObjectProcessingTime: metrics.NewUniformSample(1024),
}

// Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1).
out.sqsMessagesWaiting.Set(int64(-1))
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept).
Expand Down
32 changes: 32 additions & 0 deletions x-pack/filebeat/input/awss3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ func TestInputMetricsClose(t *testing.T) {
})
}

// TestNewInputMetricsInstance asserts that all the metrics are initialized
// when a newInputMetrics method is invoked. This avoids nil hit panics when
// a getter is invoked on any uninitialized metric.
func TestNewInputMetricsInstance(t *testing.T) {
reg := monitoring.NewRegistry()
metrics := newInputMetrics("some-new-metric-test", reg, 1)

assert.NotNil(t, metrics.sqsMessagesWaiting,
metrics.sqsMaxMessagesInflight,
metrics.sqsWorkerStartTimes,
metrics.sqsWorkerUtilizationLastUpdate,
metrics.sqsMessagesReceivedTotal,
metrics.sqsVisibilityTimeoutExtensionsTotal,
metrics.sqsMessagesInflight,
metrics.sqsMessagesReturnedTotal,
metrics.sqsMessagesDeletedTotal,
metrics.sqsMessagesWaiting,
metrics.sqsWorkerUtilization,
metrics.sqsMessageProcessingTime,
metrics.sqsLagTime,
metrics.s3ObjectsRequestedTotal,
metrics.s3ObjectsAckedTotal,
metrics.s3ObjectsListedTotal,
metrics.s3ObjectsProcessedTotal,
metrics.s3BytesProcessedTotal,
metrics.s3EventsCreatedTotal,
metrics.s3ObjectsInflight,
metrics.s3ObjectProcessingTime)

assert.Equal(t, int64(-1), metrics.sqsMessagesWaiting.Get())
}

func TestInputMetricsSQSWorkerUtilization(t *testing.T) {
const interval = 5000

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (r *sqsReader) Receive(ctx context.Context) error {
r.log.Debugf("Received %v SQS messages.", len(msgs))
r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs)))
workerWg.Add(len(msgs))

for _, msg := range msgs {
go func(msg types.Message, start time.Time) {
id := r.metrics.beginSQSWorker()
Expand Down

0 comments on commit 79c3e14

Please sign in to comment.