diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 36b0cfcd02c..e1043c96955 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -939,6 +939,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add additional network metrics to docker/network {pull}25354[25354] - Migrate ec2 metricsets to use cloudwatch input. {pull}25924[25924] - Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782] +- Migrate sqs metricsets to use cloudwatch input. {pull}26117[26117] *Packetbeat* diff --git a/metricbeat/docs/modules/aws/sqs.asciidoc b/metricbeat/docs/modules/aws/sqs.asciidoc index 7c6129cbacb..625b8185135 100644 --- a/metricbeat/docs/modules/aws/sqs.asciidoc +++ b/metricbeat/docs/modules/aws/sqs.asciidoc @@ -8,6 +8,7 @@ This file is generated! See scripts/mage/docs_collector.go include::../../../../x-pack/metricbeat/module/aws/sqs/_meta/docs.asciidoc[] +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. ==== Fields diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index 9aac3529bf0..7df3285fae8 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -15,7 +15,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/billing" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/rds" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/sqs" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/awsfargate" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/awsfargate/task_stats" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/azure" diff --git a/x-pack/metricbeat/module/aws/cloudwatch/metadata.go b/x-pack/metricbeat/module/aws/cloudwatch/metadata.go index 335f38e5b49..d91e50e9c0a 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/metadata.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/metadata.go @@ -9,11 +9,13 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch/ec2" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch/sqs" ) // AWS namespaces const ( namespaceEC2 = "AWS/EC2" + namespaceSQS = "AWS/SQS" ) // addMetadata adds metadata to the given events map based on namespace @@ -21,6 +23,8 @@ func addMetadata(namespace string, endpoint string, regionName string, awsConfig switch namespace { case namespaceEC2: return ec2.AddMetadata(endpoint, regionName, awsConfig, events) + case namespaceSQS: + return sqs.AddMetadata(endpoint, regionName, awsConfig, events) default: return events } diff --git a/x-pack/metricbeat/module/aws/cloudwatch/sqs/metadata.go b/x-pack/metricbeat/module/aws/cloudwatch/sqs/metadata.go new file mode 100644 index 00000000000..4242e346c82 --- /dev/null +++ b/x-pack/metricbeat/module/aws/cloudwatch/sqs/metadata.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sqs + +import ( + "context" + "fmt" + "strings" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +const metadataPrefix = "aws.sqs.queue" + +// AddMetadata adds metadata for SQS queues from a specific region +func AddMetadata(endpoint string, regionName string, awsConfig awssdk.Config, events map[string]mb.Event) map[string]mb.Event { + svc := sqs.New(awscommon.EnrichAWSConfigWithEndpoint( + endpoint, "sqs", regionName, awsConfig)) + + // Get queueUrls for each region + queueURLs, err := getQueueUrls(svc) + if err != nil { + logp.Error(fmt.Errorf("getQueueUrls failed, skipping region %s: %w", regionName, err)) + return events + } + + // collect monitoring state for each instance + for _, queueURL := range queueURLs { + queueURLParsed := strings.Split(queueURL, "/") + queueName := queueURLParsed[len(queueURLParsed)-1] + if _, ok := events[queueName]; !ok { + continue + } + events[queueName].RootFields.Put(metadataPrefix+"name", queueName) + } + return events +} + +func getQueueUrls(svc sqsiface.ClientAPI) ([]string, error) { + // ListQueues + listQueuesInput := &sqs.ListQueuesInput{} + req := svc.ListQueuesRequest(listQueuesInput) + output, err := req.Send(context.TODO()) + if err != nil { + err = errors.Wrap(err, "Error ListQueues") + return nil, err + } + return output.QueueUrls, nil +} diff --git a/x-pack/metricbeat/module/aws/module.yml b/x-pack/metricbeat/module/aws/module.yml index 0ccce838bc7..71e476b6426 100644 --- a/x-pack/metricbeat/module/aws/module.yml +++ b/x-pack/metricbeat/module/aws/module.yml @@ -3,6 +3,7 @@ metricsets: - ec2 - elb - ebs + - sqs - usage - sns - s3_daily_storage diff --git a/x-pack/metricbeat/module/aws/sqs/_meta/data.json b/x-pack/metricbeat/module/aws/sqs/_meta/data.json index 56785d840d2..acf033022d7 100644 --- a/x-pack/metricbeat/module/aws/sqs/_meta/data.json +++ b/x-pack/metricbeat/module/aws/sqs/_meta/data.json @@ -1,6 +1,12 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", "aws": { + "cloudwatch": { + "namespace": "AWS/SQS" + }, + "dimensions": { + "QueueName": "filebeat-aws-elb-test" + }, "sqs": { "empty_receives": 0, "messages": { @@ -8,25 +14,28 @@ "deleted": 0, "not_visible": 0, "received": 0, - "sent": 0, - "visible": 2 + "sent": 0.2857142857142857, + "visible": 827.8 }, "oldest_message_age": { - "sec": 78494 + "sec": 345606.4 }, - "queue": { - "name": "test-s3-notification" - }, - "sent_message_size": {} + "queuename": "filebeat-aws-elb-test", + "sent_message_size": { + "bytes": 1006.5 + } + }, + "tags": { + "created-by": "kaiyan" } }, "cloud": { "account": { - "id": "627959692251", - "name": "elastic-test" + "id": "428152502467", + "name": "elastic-beats" }, "provider": "aws", - "region": "ap-southeast-1" + "region": "eu-central-1" }, "event": { "dataset": "aws.sqs", diff --git a/x-pack/metricbeat/module/aws/sqs/data.go b/x-pack/metricbeat/module/aws/sqs/data.go deleted file mode 100644 index 58efdc1307c..00000000000 --- a/x-pack/metricbeat/module/aws/sqs/data.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package sqs - -import ( - s "github.com/elastic/beats/v7/libbeat/common/schema" - c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" -) - -var ( - schemaRequestFields = s.Schema{ - "oldest_message_age": s.Object{ - "sec": c.Float("ApproximateAgeOfOldestMessage"), - }, - "messages": s.Object{ - "delayed": c.Float("ApproximateNumberOfMessagesDelayed"), - "not_visible": c.Float("ApproximateNumberOfMessagesNotVisible"), - "visible": c.Float("ApproximateNumberOfMessagesVisible"), - "deleted": c.Float("NumberOfMessagesDeleted"), - "received": c.Float("NumberOfMessagesReceived"), - "sent": c.Float("NumberOfMessagesSent"), - }, - "empty_receives": c.Float("NumberOfEmptyReceives"), - "sent_message_size": s.Object{ - "bytes": c.Float("SentMessageSize"), - }, - "queue.name": c.Str("QueueName"), - } -) diff --git a/x-pack/metricbeat/module/aws/sqs/manifest.yml b/x-pack/metricbeat/module/aws/sqs/manifest.yml new file mode 100644 index 00000000000..2fe23d4d545 --- /dev/null +++ b/x-pack/metricbeat/module/aws/sqs/manifest.yml @@ -0,0 +1,46 @@ +default: true +input: + module: aws + metricset: cloudwatch + defaults: + metrics: + - namespace: AWS/SQS + resource_type: sqs + statistic: ["Average"] + name: + - ApproximateAgeOfOldestMessage + - ApproximateNumberOfMessagesDelayed + - ApproximateNumberOfMessagesNotVisible + - ApproximateNumberOfMessagesVisible + - NumberOfMessagesDeleted + - NumberOfMessagesReceived + - NumberOfMessagesSent + - NumberOfEmptyReceives + - SentMessageSize +processors: + - rename: + ignore_missing: true + fields: + - from: "aws.sqs.metrics.ApproximateAgeOfOldestMessage.avg" + to: "aws.sqs.oldest_message_age.sec" + - from: "aws.sqs.metrics.ApproximateNumberOfMessagesDelayed.avg" + to: "aws.sqs.messages.delayed" + - from: "aws.sqs.metrics.ApproximateNumberOfMessagesNotVisible.avg" + to: "aws.sqs.messages.not_visible" + - from: "aws.sqs.metrics.ApproximateNumberOfMessagesVisible.avg" + to: "aws.sqs.messages.visible" + - from: "aws.sqs.metrics.NumberOfMessagesDeleted.avg" + to: "aws.sqs.messages.deleted" + - from: "aws.sqs.metrics.NumberOfMessagesReceived.avg" + to: "aws.sqs.messages.received" + - from: "aws.sqs.metrics.NumberOfMessagesSent.avg" + to: "aws.sqs.messages.sent" + - from: "aws.sqs.metrics.NumberOfEmptyReceives.avg" + to: "aws.sqs.empty_receives" + - from: "aws.sqs.metrics.SentMessageSize.avg" + to: "aws.sqs.sent_message_size.bytes" + + - drop_fields: + ignore_missing: true + fields: + - "aws.sqs.metrics" diff --git a/x-pack/metricbeat/module/aws/sqs/sqs.go b/x-pack/metricbeat/module/aws/sqs/sqs.go deleted file mode 100644 index 5f17eccb4b1..00000000000 --- a/x-pack/metricbeat/module/aws/sqs/sqs.go +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package sqs - -import ( - "context" - "fmt" - "strconv" - "strings" - "time" - - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface" - "github.com/pkg/errors" - - s "github.com/elastic/beats/v7/libbeat/common/schema" - "github.com/elastic/beats/v7/metricbeat/mb" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" -) - -var metricsetName = "sqs" - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. -func init() { - mb.Registry.MustAddMetricSet(aws.ModuleName, metricsetName, New) -} - -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. -type MetricSet struct { - *aws.MetricSet -} - -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. -func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - metricSet, err := aws.NewMetricSet(base) - if err != nil { - return nil, errors.Wrap(err, "error creating aws metricset") - } - - // Check if period is set to be multiple of 300s - remainder := int(metricSet.Period.Seconds()) % 300 - if remainder != 0 { - err := errors.New("period needs to be set to 300s (or a multiple of 300s). " + - "To avoid data missing or extra costs, please make sure period is set correctly in config.yml") - base.Logger().Info(err) - } - - return &MetricSet{ - MetricSet: metricSet, - }, nil -} - -// Fetch methods implements the data gathering and data conversion to the right -// format. It publishes the event which is then forwarded to the output. In case -// of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - namespace := "AWS/SQS" - // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) - m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) - - for _, regionName := range m.MetricSet.RegionsList { - awsConfig := m.MetricSet.AwsConfig.Copy() - awsConfig.Region = regionName - - svcCloudwatch := cloudwatch.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "monitoring", regionName, awsConfig)) - - svcSQS := sqs.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "sqs", regionName, awsConfig)) - - // Get queueUrls for each region - queueURLs, err := getQueueUrls(svcSQS) - if err != nil { - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - if len(queueURLs) == 0 { - continue - } - - // Get listMetrics output - listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch) - if err != nil { - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - if listMetricsOutput == nil || len(listMetricsOutput) == 0 { - continue - } - - // Construct metricDataQueries - metricDataQueries := constructMetricQueries(listMetricsOutput, m.Period) - if len(metricDataQueries) == 0 { - continue - } - - // Use metricDataQueries to make GetMetricData API calls - metricDataResults, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime) - if err != nil { - err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName) - m.Logger().Error(err.Error()) - report.Error(err) - continue - } - - // Create Cloudwatch Events for SQS - err = createSQSEvents(queueURLs, metricDataResults, regionName, report, m.AccountName, m.AccountID) - if err != nil { - m.Logger().Debug("Error trying to emit event") - return nil - } - } - - return nil -} - -func getQueueUrls(svc sqsiface.ClientAPI) ([]string, error) { - // ListQueues - listQueuesInput := &sqs.ListQueuesInput{} - req := svc.ListQueuesRequest(listQueuesInput) - output, err := req.Send(context.TODO()) - if err != nil { - err = errors.Wrap(err, "Error ListQueues") - return nil, err - } - return output.QueueUrls, nil -} - -func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, period time.Duration) []cloudwatch.MetricDataQuery { - var metricDataQueries []cloudwatch.MetricDataQuery - for i, listMetric := range listMetricsOutput { - metricDataQuery := createMetricDataQuery(listMetric, i, period) - metricDataQueries = append(metricDataQueries, metricDataQuery) - } - return metricDataQueries -} - -func createMetricDataQuery(metric cloudwatch.Metric, index int, period time.Duration) (metricDataQuery cloudwatch.MetricDataQuery) { - statistic := "Average" - periodInSec := int64(period.Seconds()) - id := "sqs" + strconv.Itoa(index) - metricDims := metric.Dimensions - metricName := *metric.MetricName - queueName := "" - for _, dim := range metricDims { - if *dim.Name == "QueueName" { - queueName = *dim.Value - } - } - label := queueName + " " + metricName - - metricDataQuery = cloudwatch.MetricDataQuery{ - Id: &id, - MetricStat: &cloudwatch.MetricStat{ - Period: &periodInSec, - Stat: &statistic, - Metric: &metric, - }, - Label: &label, - } - return -} - -func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, queueName string, regionName string, schemaMetricFields s.Schema, accountName string, accountID string) (event mb.Event, err error) { - // AWS sqs metrics - mapOfMetricSetFieldResults := make(map[string]interface{}) - - // Find a timestamp for all metrics in output - timestamp := aws.FindTimestamp(getMetricDataResults) - if !timestamp.IsZero() { - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue - } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, " ") - if labels[0] == queueName && len(output.Values) > timestampIdx { - mapOfMetricSetFieldResults[labels[1]] = fmt.Sprint(output.Values[timestampIdx]) - } - } - } - } - - resultMetricSetFields, err := aws.EventMapping(mapOfMetricSetFieldResults, schemaMetricFields) - if err != nil { - err = errors.Wrap(err, "Error trying to apply schemaMetricSetFields in AWS SQS metricbeat module.") - return - } - - event = aws.InitEvent(regionName, accountName, accountID, timestamp) - event.MetricSetFields = resultMetricSetFields - event.MetricSetFields.Put("queue.name", queueName) - return -} - -func createSQSEvents(queueURLs []string, metricDataResults []cloudwatch.MetricDataResult, regionName string, report mb.ReporterV2, accountName string, accountID string) error { - for _, queueURL := range queueURLs { - queueURLParsed := strings.Split(queueURL, "/") - queueName := queueURLParsed[len(queueURLParsed)-1] - event, err := createEventPerQueue(metricDataResults, queueName, regionName, schemaRequestFields, accountName, accountID) - if err != nil { - event.Error = err - report.Event(event) - continue - } - - if reported := report.Event(event); !reported { - return errors.Wrap(err, "Fetch interrupted, failed to emit event") - } - } - - return nil -} diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go index 24f1c314b73..866cb99edd8 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_integration_test.go @@ -10,11 +10,11 @@ package sqs import ( "testing" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" - "github.com/stretchr/testify/assert" + _ "github.com/elastic/beats/v7/libbeat/processors/actions" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" ) func TestFetch(t *testing.T) { @@ -33,8 +33,6 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { config := mtest.GetConfigForTest(t, "sqs", "300s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil { - t.Fatal("write", err) - } + metricSet := mbtest.NewFetcher(t, config) + metricSet.WriteEvents(t, "/") } diff --git a/x-pack/metricbeat/module/aws/sqs/sqs_test.go b/x-pack/metricbeat/module/aws/sqs/sqs_test.go index 0468119a9c1..1f049ed4dca 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs_test.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs_test.go @@ -2,40 +2,20 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// +build !integration - package sqs import ( - "net/http" - "testing" - - awssdk "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/sqsiface" - "github.com/stretchr/testify/assert" -) + "os" -// MockSQSClient struct is used for unit tests. -type MockSQSClient struct { - sqsiface.ClientAPI -} + "github.com/elastic/beats/v7/metricbeat/mb" -func (m *MockSQSClient) ListQueuesRequest(input *sqs.ListQueuesInput) sqs.ListQueuesRequest { - httpReq, _ := http.NewRequest("", "", nil) - return sqs.ListQueuesRequest{ - Request: &awssdk.Request{ - Data: &sqs.ListQueuesOutput{ - QueueUrls: []string{"https://sqs.us-east-1.amazonaws.com/123/sqs1", "https://sqs.us-east-1.amazonaws.com/123/sqs2"}, - }, - HTTPRequest: httpReq, - }, - } -} + // Register input module and metricset + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" +) -func TestGetQueueUrls(t *testing.T) { - mockSvc := &MockSQSClient{} - queueUrls, err := getQueueUrls(mockSvc) - assert.NoError(t, err) - assert.Equal(t, []string{"https://sqs.us-east-1.amazonaws.com/123/sqs1", "https://sqs.us-east-1.amazonaws.com/123/sqs2"}, queueUrls) +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) }