diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 33f6f4067760..9e6d4961e9a0 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,12 +187,12 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -426,10 +426,10 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index df535a2d4734..0bff7d057cde 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -174,6 +174,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"), sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index fc39786cf0b2..180310881eb2 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -29,6 +29,36 @@ func TestInputMetricsClose(t *testing.T) { }) } +// TestNewInputMetricsInstance asserts that all the metrics are initialized +// when a newInputMetrics method is invoked. This avoids nil hit panics when +// a getter is invoked on any uninitialized metric. +func TestNewInputMetricsInstance(t *testing.T) { + reg := monitoring.NewRegistry() + metrics := newInputMetrics("some-new-metric-test", reg, 1) + + assert.NotNil(t, metrics.sqsMessagesWaiting, + metrics.sqsMaxMessagesInflight, + metrics.sqsWorkerStartTimes, + metrics.sqsWorkerUtilizationLastUpdate, + metrics.sqsMessagesReceivedTotal, + metrics.sqsVisibilityTimeoutExtensionsTotal, + metrics.sqsMessagesInflight, + metrics.sqsMessagesReturnedTotal, + metrics.sqsMessagesDeletedTotal, + metrics.sqsMessagesWaiting, + metrics.sqsWorkerUtilization, + metrics.sqsMessageProcessingTime, + metrics.sqsLagTime, + metrics.s3ObjectsRequestedTotal, + metrics.s3ObjectsAckedTotal, + metrics.s3ObjectsListedTotal, + metrics.s3ObjectsProcessedTotal, + metrics.s3BytesProcessedTotal, + metrics.s3EventsCreatedTotal, + metrics.s3ObjectsInflight, + metrics.s3ObjectProcessingTime) +} + func TestInputMetricsSQSWorkerUtilization(t *testing.T) { const interval = 5000