Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/awss3 ; Fix nil hit panic when a getter is invoked on input metric #36101

Merged
merged 13 commits into from
Jul 24, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027]
- Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008]
- Fix handling of region name configuration in awss3 input {pull}36034[36034]
- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077]
- Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107]

*Heartbeat*
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ observe the activity of the input.
| `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge).
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html<GetQueueAttributes>. A value of `-1` indicates the metric is uninitialized or could not be collected due to an error.
| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
Expand Down
38 changes: 27 additions & 11 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,28 +409,44 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
}

func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) {
// Run GetApproximateMessageCount before start of timer to set initial count for sqs waiting metric
// This is to avoid misleading values in metric when sqs messages are processed before the ticker channel kicks in
if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn {
return
}

t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
receiver.metrics.setSQSMessagesWaiting(int64(count))
return
}
if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn {
return
}
}
}
}

receiver.metrics.setSQSMessagesWaiting(int64(count))
// updateMessageCount runs GetApproximateMessageCount for the given context and updates the receiver metric with the count returning false on no error
// If there is an error, the metric is reinitialized to -1 and true is returned
func updateMessageCount(receiver *sqsReader, ctx context.Context) bool {
count, err := receiver.GetApproximateMessageCount(ctx)

var apiError smithy.APIError
if errors.As(err, &apiError) {
switch apiError.ErrorCode() {
case sqsAccessDeniedErrorCode:
// stop polling if auth error is encountered
// Set it back to -1 because there is a permission error
receiver.metrics.sqsMessagesWaiting.Set(int64(-1))
return true
}
}

receiver.metrics.sqsMessagesWaiting.Set(int64(count))
return false
}

// boolPtr returns a pointer to b.
Expand Down
6 changes: 2 additions & 4 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,11 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
//assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
//assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -426,10 +425,9 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed.
assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0)
//assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
//assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077
assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end
}
16 changes: 4 additions & 12 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,6 @@ func (m *inputMetrics) Close() {
m.unregister()
}

func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
if m.sqsMessagesWaiting == nil {
// if metric not initialized, and count is -1, do nothing
if count == -1 {
return
}
m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge")
}

m.sqsMessagesWaiting.Set(count)
}

// beginSQSWorker tracks the start of a new SQS worker. The returned ID
// must be used to call endSQSWorker when the worker finishes. It also
// increments the sqsMessagesInflight counter.
Expand Down Expand Up @@ -174,6 +162,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"),
bhapas marked this conversation as resolved.
Show resolved Hide resolved
sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
Expand All @@ -186,6 +175,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"),
s3ObjectProcessingTime: metrics.NewUniformSample(1024),
}

// Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1).
out.sqsMessagesWaiting.Set(int64(-1))
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept).
Expand Down
32 changes: 32 additions & 0 deletions x-pack/filebeat/input/awss3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,38 @@ func TestInputMetricsClose(t *testing.T) {
})
}

// TestNewInputMetricsInstance asserts that all the metrics are initialized
// when a newInputMetrics method is invoked. This avoids nil hit panics when
// a getter is invoked on any uninitialized metric.
func TestNewInputMetricsInstance(t *testing.T) {
reg := monitoring.NewRegistry()
metrics := newInputMetrics("some-new-metric-test", reg, 1)

assert.NotNil(t, metrics.sqsMessagesWaiting,
metrics.sqsMaxMessagesInflight,
metrics.sqsWorkerStartTimes,
metrics.sqsWorkerUtilizationLastUpdate,
metrics.sqsMessagesReceivedTotal,
metrics.sqsVisibilityTimeoutExtensionsTotal,
metrics.sqsMessagesInflight,
metrics.sqsMessagesReturnedTotal,
metrics.sqsMessagesDeletedTotal,
metrics.sqsMessagesWaiting,
metrics.sqsWorkerUtilization,
metrics.sqsMessageProcessingTime,
metrics.sqsLagTime,
metrics.s3ObjectsRequestedTotal,
metrics.s3ObjectsAckedTotal,
metrics.s3ObjectsListedTotal,
metrics.s3ObjectsProcessedTotal,
metrics.s3BytesProcessedTotal,
metrics.s3EventsCreatedTotal,
metrics.s3ObjectsInflight,
metrics.s3ObjectProcessingTime)

assert.Equal(t, int64(-1), metrics.sqsMessagesWaiting.Get())
}

func TestInputMetricsSQSWorkerUtilization(t *testing.T) {
const interval = 5000

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (r *sqsReader) Receive(ctx context.Context) error {
r.log.Debugf("Received %v SQS messages.", len(msgs))
r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs)))
workerWg.Add(len(msgs))

for _, msg := range msgs {
go func(msg types.Message, start time.Time) {
id := r.metrics.beginSQSWorker()
Expand Down