From b355b435155f493cf5ac61ad16d85abd793f5a6e Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Tue, 18 Jul 2023 14:03:46 +0200 Subject: [PATCH 1/8] Fix nil hit panic when a getter is invoked on input metric --- CHANGELOG.next.asciidoc | 1 + .../input/awss3/input_integration_test.go | 8 ++--- x-pack/filebeat/input/awss3/metrics.go | 1 + x-pack/filebeat/input/awss3/metrics_test.go | 30 +++++++++++++++++++ 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8b01cf6c6621..d0bfae13ff4c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027] - Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008] - Fix handling of region name configuration in awss3 input {pull}36034[36034] +- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 33f6f4067760..9e6d4961e9a0 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,12 +187,12 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -426,10 +426,10 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index df535a2d4734..0bff7d057cde 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -174,6 +174,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"), sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index fc39786cf0b2..180310881eb2 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -29,6 +29,36 @@ func TestInputMetricsClose(t *testing.T) { }) } +// TestNewInputMetricsInstance asserts that all the metrics are initialized +// when a newInputMetrics method is invoked. This avoids nil hit panics when +// a getter is invoked on any uninitialized metric. +func TestNewInputMetricsInstance(t *testing.T) { + reg := monitoring.NewRegistry() + metrics := newInputMetrics("some-new-metric-test", reg, 1) + + assert.NotNil(t, metrics.sqsMessagesWaiting, + metrics.sqsMaxMessagesInflight, + metrics.sqsWorkerStartTimes, + metrics.sqsWorkerUtilizationLastUpdate, + metrics.sqsMessagesReceivedTotal, + metrics.sqsVisibilityTimeoutExtensionsTotal, + metrics.sqsMessagesInflight, + metrics.sqsMessagesReturnedTotal, + metrics.sqsMessagesDeletedTotal, + metrics.sqsMessagesWaiting, + metrics.sqsWorkerUtilization, + metrics.sqsMessageProcessingTime, + metrics.sqsLagTime, + metrics.s3ObjectsRequestedTotal, + metrics.s3ObjectsAckedTotal, + metrics.s3ObjectsListedTotal, + metrics.s3ObjectsProcessedTotal, + metrics.s3BytesProcessedTotal, + metrics.s3EventsCreatedTotal, + metrics.s3ObjectsInflight, + metrics.s3ObjectProcessingTime) +} + func TestInputMetricsSQSWorkerUtilization(t *testing.T) { const interval = 5000 From 9d4e69acd0a574d8941c0af145856591bf4eb883 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Tue, 18 Jul 2023 20:27:10 +0200 Subject: [PATCH 2/8] Fix PR comment --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/awss3/input.go | 4 ++-- x-pack/filebeat/input/awss3/metrics.go | 15 +++------------ x-pack/filebeat/input/awss3/metrics_test.go | 2 ++ 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 1df7f4bb3418..faa090915f95 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -797,7 +797,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. A value of (0) indicates `no messages waiting` but a value 0f (-1) indicates `metrics not collected/error collecting metrics ` | `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 221084881f85..a2c54fd6ec42 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -423,12 +423,12 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { switch apiError.ErrorCode() { case sqsAccessDeniedErrorCode: // stop polling if auth error is encountered - receiver.metrics.setSQSMessagesWaiting(int64(count)) + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) return } } - receiver.metrics.setSQSMessagesWaiting(int64(count)) + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) } } } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index 0bff7d057cde..bef57210ca65 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -79,18 +79,6 @@ func (m *inputMetrics) Close() { m.unregister() } -func (m *inputMetrics) setSQSMessagesWaiting(count int64) { - if m.sqsMessagesWaiting == nil { - // if metric not initialized, and count is -1, do nothing - if count == -1 { - return - } - m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") - } - - m.sqsMessagesWaiting.Set(count) -} - // beginSQSWorker tracks the start of a new SQS worker. The returned ID // must be used to call endSQSWorker when the worker finishes. It also // increments the sqsMessagesInflight counter. @@ -187,6 +175,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), s3ObjectProcessingTime: metrics.NewUniformSample(1024), } + + // Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1). + out.sqsMessagesWaiting.Set(int64(-1)) adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept). diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index 180310881eb2..e153d321e9f1 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -57,6 +57,8 @@ func TestNewInputMetricsInstance(t *testing.T) { metrics.s3EventsCreatedTotal, metrics.s3ObjectsInflight, metrics.s3ObjectProcessingTime) + + assert.Equal(t, int64(-1), metrics.sqsMessagesWaiting.Get()) } func TestInputMetricsSQSWorkerUtilization(t *testing.T) { From fd79013a97888cc25d724e7f67d0d6d7c01fa0d1 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Wed, 19 Jul 2023 08:58:54 +0200 Subject: [PATCH 3/8] Initialize metric to 0 if SQS messages are received --- x-pack/filebeat/input/awss3/sqs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 01ed8bfb1835..9d095208ac8b 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,7 +79,11 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) + // Initialize the sqs_message_waiting_guage to 0 to indicate that that SQS messages are received. + // PollSqsWaitingMetric shall reassign the value if there are messages waiting or if there is an error in processing the messages. + r.metrics.sqsMessagesWaiting.Set(int64(0)) workerWg.Add(len(msgs)) + for _, msg := range msgs { go func(msg types.Message, start time.Time) { id := r.metrics.beginSQSWorker() From a44508b3666bdcb65092dfcab4fc2823529ac4b8 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Wed, 19 Jul 2023 09:26:47 +0200 Subject: [PATCH 4/8] Set the metric count to -1 in error case --- x-pack/filebeat/input/awss3/input.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index a2c54fd6ec42..b4b87a400f4c 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -423,7 +423,8 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { switch apiError.ErrorCode() { case sqsAccessDeniedErrorCode: // stop polling if auth error is encountered - receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + // Set it back to -1 because there is a permission error + receiver.metrics.sqsMessagesWaiting.Set(int64(-1)) return } } From 3252c1e21abb48fd42ce8591d1b50c0aaf74e29e Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Thu, 20 Jul 2023 08:36:31 +0200 Subject: [PATCH 5/8] Move the metric initialization --- x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/awss3/input.go | 4 ++++ x-pack/filebeat/input/awss3/sqs.go | 3 --- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index faa090915f95..794a51de0819 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -797,7 +797,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. A value of (0) indicates `no messages waiting` but a value 0f (-1) indicates `metrics not collected/error collecting metrics ` +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html. A value of `-1` indicates the metric is uninitialized or could not be collected due to an error. | `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index b4b87a400f4c..b7f2abfe41f7 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -411,6 +411,10 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { t := time.NewTicker(time.Minute) defer t.Stop() + // Initialize the metric to 0 at the start of the one minute time interval to avoid + // giving misleading metric value -1 even though SQS messages are processed. + // The value will be updated every minute + receiver.metrics.sqsMessagesWaiting.Set(int64(0)) for { select { case <-ctx.Done(): diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 9d095208ac8b..dd454a3bfb92 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -79,9 +79,6 @@ func (r *sqsReader) Receive(ctx context.Context) error { // Process each SQS message asynchronously with a goroutine. r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) - // Initialize the sqs_message_waiting_guage to 0 to indicate that that SQS messages are received. - // PollSqsWaitingMetric shall reassign the value if there are messages waiting or if there is an error in processing the messages. - r.metrics.sqsMessagesWaiting.Set(int64(0)) workerWg.Add(len(msgs)) for _, msg := range msgs { From fec1505504b9c4227b51822fdf51cd274098a900 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 24 Jul 2023 08:47:19 +0200 Subject: [PATCH 6/8] Execute GetApproximateMessageCount before timer --- x-pack/filebeat/input/awss3/input.go | 43 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index b7f2abfe41f7..fa2bedfeebe7 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -409,33 +409,44 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { } func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { + // Run GetApproximateMessageCount before start of timer to set initial count for sqs waiting metric + // This is to avoid misleading values in metric when sqs messages are processed before the ticker channel kicks in + if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn { + return + } + t := time.NewTicker(time.Minute) defer t.Stop() - // Initialize the metric to 0 at the start of the one minute time interval to avoid - // giving misleading metric value -1 even though SQS messages are processed. - // The value will be updated every minute - receiver.metrics.sqsMessagesWaiting.Set(int64(0)) for { select { case <-ctx.Done(): return case <-t.C: - count, err := receiver.GetApproximateMessageCount(ctx) - - var apiError smithy.APIError - if errors.As(err, &apiError) { - switch apiError.ErrorCode() { - case sqsAccessDeniedErrorCode: - // stop polling if auth error is encountered - // Set it back to -1 because there is a permission error - receiver.metrics.sqsMessagesWaiting.Set(int64(-1)) - return - } + if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn { + return } + } + } +} - receiver.metrics.sqsMessagesWaiting.Set(int64(count)) +// updateMessageCount runs GetApproximateMessageCount for the given context and updates the receiver metric with the count returning false on no error +// If there is an error, the metric is reinitialized to -1 and true is returned +func updateMessageCount(receiver *sqsReader, ctx context.Context) bool { + count, err := receiver.GetApproximateMessageCount(ctx) + + var apiError smithy.APIError + if errors.As(err, &apiError) { + switch apiError.ErrorCode() { + case sqsAccessDeniedErrorCode: + // stop polling if auth error is encountered + // Set it back to -1 because there is a permission error + receiver.metrics.sqsMessagesWaiting.Set(int64(-1)) + return true } } + + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + return false } // boolPtr returns a pointer to b. From acbe41be494055fcedfd3884113b3245ea4cc903 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 24 Jul 2023 10:55:27 +0200 Subject: [PATCH 7/8] fix assertion --- x-pack/filebeat/input/awss3/input_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 9e6d4961e9a0..5ddcc933a0b0 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,7 +187,7 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 1) // sqsMessagesWaitingMetric is polled every minute and test completes in less than a minute assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) @@ -426,7 +426,7 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) + assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 1) // sqsMessagesWaitingMetric is polled every minute and test completes in less than a minute assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) From 2463b431cd4aabb667efe359d0c24a4399d681d7 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula Date: Mon, 24 Jul 2023 15:27:21 +0200 Subject: [PATCH 8/8] Remove ambiguous assertion --- x-pack/filebeat/input/awss3/input_integration_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 9e6d4961e9a0..c843e47e93f6 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,7 +187,6 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) @@ -426,7 +425,6 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)