diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b8e17455a33..3f16509e8c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -639,6 +639,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Refactor state_* metricsets to share response from endpoint. {pull}25640[25640] - Add server id to zookeeper events. {pull}25550[25550] - Add additional network metrics to docker/network {pull}25354[25354] +- Migrate ec2 metricsets to use cloudwatch input. {pull}25924[25924] - Reduce number of requests done by kubernetes metricsets to kubelet. {pull}25782[25782] *Packetbeat* diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index 15a643ec929..9aac3529bf0 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -14,7 +14,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/billing" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" - _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/ec2" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/rds" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/sqs" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/awsfargate" diff --git a/x-pack/metricbeat/module/aws/cloudwatch/_meta/data.json b/x-pack/metricbeat/module/aws/cloudwatch/_meta/data.json index d7b8b1f83e4..983559a08b7 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/_meta/data.json +++ b/x-pack/metricbeat/module/aws/cloudwatch/_meta/data.json @@ -5,51 +5,199 @@ "namespace": "AWS/RDS" }, "dimensions": { - "EngineName": "mariadb" + "DBClusterIdentifier": "database-1", + "Role": "READER" }, "rds": { "metrics": { - "BinLogDiskUsage": { - "avg": 2803, + "AbortedClients": { + "avg": 0, "count": 5, - "max": 3007, - "min": 2752, - "sum": 14015 + "max": 0, + "min": 0, + "sum": 0 }, - "CPUCreditBalance": { - "avg": 144, - "count": 1, - "max": 144, - "min": 144, - "sum": 144 + "ActiveTransactions": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 }, - "CPUCreditUsage": { - "avg": 0.062006, - "count": 1, - "max": 0.062006, - "min": 0.062006, - "sum": 0.062006 + "AuroraBinlogReplicaLag": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "AuroraReplicaLag": { + "avg": 18.4158, + "count": 5, + "max": 23.787, + "min": 10.634, + "sum": 92.07900000000001 }, - "CPUSurplusCreditBalance": { + "AuroraVolumeBytesLeftTotal": { + "avg": 70007366615040, + "count": 5, + "max": 70007366615040, + "min": 70007366615040, + "sum": 350036833075200 + }, + "Aurora_pq_request_attempted": { "avg": 0, - "count": 1, + "count": 5, "max": 0, "min": 0, "sum": 0 }, - "CPUSurplusCreditsCharged": { + "Aurora_pq_request_executed": { "avg": 0, - "count": 1, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_failed": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_in_progress": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen_below_min_rows": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen_few_pages_outside_buffer_pool": { + "avg": 0, + "count": 5, "max": 0, "min": 0, "sum": 0 }, + "Aurora_pq_request_not_chosen_long_trx": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen_pq_high_buffer_pool_pct": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen_small_table": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_not_chosen_unsupported_access": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "Aurora_pq_request_throttled": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "BlockedTransactions": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "BufferCacheHitRatio": { + "avg": 100, + "count": 5, + "max": 100, + "min": 100, + "sum": 500 + }, "CPUUtilization": { - "avg": 1.2662313605690698, + "avg": 6.051666111792592, + "count": 5, + "max": 6.216563057282379, + "min": 5.808333333333334, + "sum": 30.25833055896296 + }, + "CommitLatency": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "CommitThroughput": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "ConnectionAttempts": { + "avg": 0, "count": 5, - "max": 1.66666666666667, - "min": 1.01694915255224, - "sum": 6.331156802845349 + "max": 0, + "min": 0, + "sum": 0 + }, + "DDLLatency": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "DDLThroughput": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "DMLLatency": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "DMLThroughput": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 }, "DatabaseConnections": { "avg": 0, @@ -58,47 +206,117 @@ "min": 0, "sum": 0 }, - "DiskQueueDepth": { - "avg": 0.00009332666777759262, + "Deadlocks": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "DeleteLatency": { + "avg": 0, "count": 5, - "max": 0.0002666666666666667, + "max": 0, "min": 0, - "sum": 0.0004666333388879631 + "sum": 0 }, - "FreeStorageSpace": { - "avg": 20402470912, + "DeleteThroughput": { + "avg": 0, "count": 5, - "max": 20402470912, - "min": 20402470912, - "sum": 102012354560 + "max": 0, + "min": 0, + "sum": 0 + }, + "EBSByteBalance%": { + "avg": 99, + "count": 1, + "max": 99, + "min": 99, + "sum": 99 + }, + "EBSIOBalance%": { + "avg": 99, + "count": 1, + "max": 99, + "min": 99, + "sum": 99 + }, + "EngineUptime": { + "avg": 20800826, + "count": 5, + "max": 20800946, + "min": 20800706, + "sum": 104004130 + }, + "FreeLocalStorage": { + "avg": 29682751078.4, + "count": 5, + "max": 29682819072, + "min": 29682675712, + "sum": 148413755392 }, "FreeableMemory": { - "avg": 446488576, + "avg": 4639068160, "count": 5, - "max": 446668800, - "min": 446275584, - "sum": 2232442880 + "max": 4639838208, + "min": 4638638080, + "sum": 23195340800 + }, + "InsertLatency": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "InsertThroughput": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "LoginFailures": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 }, "NetworkReceiveThroughput": { - "avg": 407.83898884048796, + "avg": 0.8399323667305664, + "count": 5, + "max": 1.399556807011113, + "min": 0.6999533364442371, + "sum": 4.199661833652832 + }, + "NetworkThroughput": { + "avg": 1.6798647334611327, "count": 5, - "max": 499.9416676388727, - "min": 354.27742870952153, - "sum": 2039.1949442024397 + "max": 2.799113614022226, + "min": 1.3999066728884741, + "sum": 8.399323667305664 }, "NetworkTransmitThroughput": { - "avg": 2667.0383202542657, + "avg": 0.8399323667305664, "count": 5, - "max": 3228.9489909846857, - "min": 2274.512091465142, - "sum": 13335.191601271328 + "max": 1.399556807011113, + "min": 0.6999533364442371, + "sum": 4.199661833652832 }, - "ReadIOPS": { - "avg": 0.2333294445092582, + "NumBinaryLogFiles": { + "avg": 0, "count": 5, - "max": 1.166647222546291, + "max": 0, "min": 0, - "sum": 1.166647222546291 + "sum": 0 + }, + "Queries": { + "avg": 6.3836833181909265, + "count": 5, + "max": 6.53289780681288, + "min": 6.184260972479205, + "sum": 31.91841659095463 }, "ReadLatency": { "avg": 0, @@ -107,51 +325,79 @@ "min": 0, "sum": 0 }, - "ReadThroughput": { - "avg": 136.53333333333333, + "ResultSetCacheHitRatio": { + "avg": 0, "count": 5, - "max": 682.6666666666666, + "max": 0, "min": 0, - "sum": 682.6666666666666 + "sum": 0 }, - "SwapUsage": { - "avg": 5287936, + "RollbackSegmentHistoryListLength": { + "avg": 0, "count": 5, - "max": 5287936, - "min": 5287936, - "sum": 26439680 + "max": 0, + "min": 0, + "sum": 0 }, - "WriteIOPS": { - "avg": 0.27999083424342597, + "RowLockTime": { + "avg": 0, "count": 5, - "max": 0.6999883335277746, + "max": 0, "min": 0, - "sum": 1.39995417121713 + "sum": 0 }, - "WriteLatency": { - "avg": 0.00009062937062937063, + "SelectLatency": { + "avg": 0.2519199153394592, + "count": 5, + "max": 0.2609050632911392, + "min": 0.24367924528301885, + "sum": 1.2595995766972958 + }, + "SelectThroughput": { + "avg": 2.6002296989354514, + "count": 5, + "max": 2.650618477644784, + "min": 2.5335866920025336, + "sum": 13.001148494677256 + }, + "SumBinaryLogSize": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 + }, + "UpdateLatency": { + "avg": 0, "count": 5, - "max": 0.0003076923076923077, + "max": 0, "min": 0, - "sum": 0.00045314685314685316 + "sum": 0 }, - "WriteThroughput": { - "avg": 2621.2741374938682, + "UpdateThroughput": { + "avg": 0, "count": 5, - "max": 7441.066666666667, + "max": 0, "min": 0, - "sum": 13106.370687469342 + "sum": 0 + }, + "WriteLatency": { + "avg": 0, + "count": 5, + "max": 0, + "min": 0, + "sum": 0 } } } }, "cloud": { "account": { - "id": "627959692251", - "name": "elastic-test" + "id": "428152502467", + "name": "elastic-beats" }, "provider": "aws", - "region": "ap-southeast-1" + "region": "eu-west-1" }, "event": { "dataset": "aws.cloudwatch", diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index af04508a111..31df9f638d6 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -182,6 +182,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { } } + // Create events based on namespaceDetailTotal from configuration for _, regionName := range m.MetricSet.RegionsList { m.logger.Debugf("Collecting metrics from AWS region %s", regionName) awsConfig := m.MetricSet.AwsConfig.Copy() @@ -193,7 +194,6 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { svcResourceAPI := resourcegroupstaggingapi.New(awscommon.EnrichAWSConfigWithEndpoint( m.Endpoint, "tagging", regionName, awsConfig)) - // Create events based on namespaceDetailTotal from configuration for namespace, namespaceDetails := range namespaceDetailTotal { m.logger.Debugf("Collected metrics from namespace %s", namespace) @@ -219,7 +219,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { m.logger.Debugf("Collected number of metrics = %d", len(eventsWithIdentifier)) - err = reportEvents(eventsWithIdentifier, report) + err = reportEvents(addMetadata(namespace, m.Endpoint, regionName, awsConfig, eventsWithIdentifier), report) if err != nil { return errors.Wrap(err, "reportEvents failed") } diff --git a/x-pack/metricbeat/module/aws/cloudwatch/ec2/metadata.go b/x-pack/metricbeat/module/aws/cloudwatch/ec2/metadata.go new file mode 100644 index 00000000000..8637afd8979 --- /dev/null +++ b/x-pack/metricbeat/module/aws/cloudwatch/ec2/metadata.go @@ -0,0 +1,186 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package ec2 + +import ( + "context" + "fmt" + "strings" + + awssdk "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ec2" + "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" +) + +const metadataPrefix = "aws.ec2.instance." + +// AddMetadata adds metadata for EC2 instances from a specific region +func AddMetadata(endpoint string, regionName string, awsConfig awssdk.Config, events map[string]mb.Event) map[string]mb.Event { + svcEC2 := ec2.New(awscommon.EnrichAWSConfigWithEndpoint( + endpoint, "ec2", regionName, awsConfig)) + + instancesOutputs, err := getInstancesPerRegion(svcEC2) + if err != nil { + logp.Error(fmt.Errorf("getInstancesPerRegion failed, skipping region %s: %w", regionName, err)) + return events + } + + // collect monitoring state for each instance + monitoringStates := map[string]string{} + for instanceID, output := range instancesOutputs { + if _, ok := events[instanceID]; !ok { + continue + } + + for _, tag := range output.Tags { + if *tag.Key == "Name" { + events[instanceID].RootFields.Put("cloud.instance.name", *tag.Value) + events[instanceID].RootFields.Put("host.name", *tag.Value) + } + } + + events[instanceID].RootFields.Put("cloud.instance.id", instanceID) + if machineType, err := output.InstanceType.MarshalValue(); err == nil { + events[instanceID].RootFields.Put("cloud.machine.type", machineType) + } else { + logp.Error(fmt.Errorf("InstanceType.MarshalValue failed: %w", err)) + } + + placement := output.Placement + if placement != nil { + events[instanceID].RootFields.Put("cloud.availability_zone", *placement.AvailabilityZone) + } + + if instanceStateName, err := output.State.Name.MarshalValue(); err == nil { + events[instanceID].RootFields.Put(metadataPrefix+"state.name", instanceStateName) + } else { + logp.Error(fmt.Errorf("instance.State.Name.MarshalValue failed: %w", err)) + } + + if monitoringState, err := output.Monitoring.State.MarshalValue(); err == nil { + monitoringStates[instanceID] = monitoringState + events[instanceID].RootFields.Put(metadataPrefix+"monitoring.state", monitoringState) + } else { + logp.Error(fmt.Errorf("Monitoring.State.MarshalValue failed: %w", err)) + } + + cpuOptions := output.CpuOptions + if cpuOptions != nil { + events[instanceID].RootFields.Put(metadataPrefix+"core.count", *cpuOptions.CoreCount) + events[instanceID].RootFields.Put(metadataPrefix+"threads_per_core", *cpuOptions.ThreadsPerCore) + } + + publicIP := output.PublicIpAddress + if publicIP != nil { + events[instanceID].RootFields.Put(metadataPrefix+"public.ip", *publicIP) + } + + privateIP := output.PrivateIpAddress + if privateIP != nil { + events[instanceID].RootFields.Put(metadataPrefix+"private.ip", *privateIP) + } + + events[instanceID].RootFields.Put(metadataPrefix+"image.id", *output.ImageId) + events[instanceID].RootFields.Put(metadataPrefix+"state.code", *output.State.Code) + events[instanceID].RootFields.Put(metadataPrefix+"public.dns_name", *output.PublicDnsName) + events[instanceID].RootFields.Put(metadataPrefix+"private.dns_name", *output.PrivateDnsName) + + // add host cpu/network/disk fields and host.id + addHostFields(events[instanceID], instanceID) + + // add rate metrics + calculateRate(events[instanceID], monitoringStates[instanceID]) + } + return events +} + +func getInstancesPerRegion(svc ec2iface.ClientAPI) (map[string]*ec2.Instance, error) { + instancesOutputs := map[string]*ec2.Instance{} + output := ec2.DescribeInstancesOutput{NextToken: nil} + init := true + for init || output.NextToken != nil { + init = false + describeInstanceInput := &ec2.DescribeInstancesInput{} + req := svc.DescribeInstancesRequest(describeInstanceInput) + output, err := req.Send(context.Background()) + if err != nil { + err = errors.Wrap(err, "Error DescribeInstances") + return nil, err + } + + for _, reservation := range output.Reservations { + for _, instance := range reservation.Instances { + instancesOutputs[*instance.InstanceId] = &instance + } + } + } + return instancesOutputs, nil +} + +func addHostFields(event mb.Event, instanceID string) { + event.RootFields.Put("host.id", instanceID) + + // If there is no instance name, use instance ID as the host.name + hostName, err := event.RootFields.GetValue("host.name") + if err == nil && hostName != nil { + event.RootFields.Put("host.name", hostName) + } else { + event.RootFields.Put("host.name", instanceID) + } + + hostFieldTable := map[string]string{ + "aws.ec2.metrics.CPUUtilization.avg": "host.cpu.usage", + "aws.ec2.metrics.NetworkIn.sum": "host.network.ingress.bytes", + "aws.ec2.metrics.NetworkOut.sum": "host.network.egress.bytes", + "aws.ec2.metrics.NetworkPacketsIn.sum": "host.network.ingress.packets", + "aws.ec2.metrics.NetworkPacketsOut.sum": "host.network.egress.packets", + "aws.ec2.metrics.DiskReadBytes.sum": "host.disk.read.bytes", + "aws.ec2.metrics.DiskWriteBytes.sum": "host.disk.write.bytes", + } + + for ec2MetricName, hostMetricName := range hostFieldTable { + metricValue, err := event.RootFields.GetValue(ec2MetricName) + if err != nil { + continue + } + + if value, ok := metricValue.(float64); ok { + if ec2MetricName == "cpu.total.pct" { + value = value / 100 + } + event.RootFields.Put(hostMetricName, value) + } + } +} + +func calculateRate(event mb.Event, monitoringState string) { + var period = 300.0 + if monitoringState != "disabled" { + period = 60.0 + } + + metricList := []string{ + "aws.ec2.metrics.NetworkIn.sum", + "aws.ec2.metrics.NetworkOut.sum", + "aws.ec2.metrics.NetworkPacketsIn.sum", + "aws.ec2.metrics.NetworkPacketsOut.sum", + "aws.ec2.metrics.DiskReadBytes.sum", + "aws.ec2.metrics.DiskWriteBytes.sum", + "aws.ec2.metrics.DiskReadOps.sum", + "aws.ec2.metrics.DiskWriteOps.sum"} + + for _, metricName := range metricList { + metricValue, err := event.RootFields.GetValue(metricName) + if err == nil && metricValue != nil { + rateValue := metricValue.(float64) / period + event.RootFields.Put(strings.Replace(metricName, ".sum", ".rate", -1), rateValue) + } + } +} diff --git a/x-pack/metricbeat/module/aws/cloudwatch/metadata.go b/x-pack/metricbeat/module/aws/cloudwatch/metadata.go new file mode 100644 index 00000000000..335f38e5b49 --- /dev/null +++ b/x-pack/metricbeat/module/aws/cloudwatch/metadata.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudwatch + +import ( + awssdk "github.com/aws/aws-sdk-go-v2/aws" + + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch/ec2" +) + +// AWS namespaces +const ( + namespaceEC2 = "AWS/EC2" +) + +// addMetadata adds metadata to the given events map based on namespace +func addMetadata(namespace string, endpoint string, regionName string, awsConfig awssdk.Config, events map[string]mb.Event) map[string]mb.Event { + switch namespace { + case namespaceEC2: + return ec2.AddMetadata(endpoint, regionName, awsConfig, events) + default: + return events + } +} diff --git a/x-pack/metricbeat/module/aws/ec2/_meta/data.json b/x-pack/metricbeat/module/aws/ec2/_meta/data.json index c602fdfb227..a0afcf89fc2 100644 --- a/x-pack/metricbeat/module/aws/ec2/_meta/data.json +++ b/x-pack/metricbeat/module/aws/ec2/_meta/data.json @@ -1,20 +1,22 @@ { "@timestamp": "2017-10-12T08:05:34.853Z", "aws": { + "cloudwatch": { + "namespace": "AWS/EC2" + }, + "dimensions": { + "InstanceId": "i-0853c3c24739b4696" + }, "ec2": { "cpu": { "credit_balance": 288, - "credit_usage": 0.008857266666666667, + "credit_usage": 0.008267983333333333, "surplus_credit_balance": 0, "surplus_credits_charged": 0, "total": { - "pct": 0 + "pct": 0.08999900021110921 } }, - "diskio": { - "read": {}, - "write": {} - }, "instance": { "core": { "count": 1 @@ -41,16 +43,16 @@ }, "network": { "in": { - "bytes": 6557, - "bytes_per_sec": 21.856666666666666, - "packets": 51, - "packets_per_sec": 0.17 + "bytes": 5259, + "bytes_per_sec": 17.53, + "packets": 31, + "packets_per_sec": 0.10333333333333333 }, "out": { - "bytes": 8819, - "bytes_per_sec": 29.39666666666667, - "packets": 39, - "packets_per_sec": 0.13 + "bytes": 12879, + "bytes_per_sec": 42.93, + "packets": 34, + "packets_per_sec": 0.11333333333333333 } }, "status": { @@ -82,18 +84,18 @@ }, "host": { "cpu": { - "usage": 0 + "usage": 0.08999900021110921 }, "id": "i-0853c3c24739b4696", "name": "i-0853c3c24739b4696", "network": { "egress": { - "bytes": 8819, - "packets": 39 + "bytes": 12879, + "packets": 34 }, "ingress": { - "bytes": 6557, - "packets": 51 + "bytes": 5259, + "packets": 31 } } }, diff --git a/x-pack/metricbeat/module/aws/ec2/data.go b/x-pack/metricbeat/module/aws/ec2/data.go deleted file mode 100644 index 6dbc8749b35..00000000000 --- a/x-pack/metricbeat/module/aws/ec2/data.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package ec2 - -import ( - s "github.com/elastic/beats/v7/libbeat/common/schema" - c "github.com/elastic/beats/v7/libbeat/common/schema/mapstrstr" -) - -var ( - schemaMetricSetFieldsAverage = s.Schema{ - "cpu": s.Object{ - "total": s.Object{ - "pct": c.Float("CPUUtilization"), - }, - "credit_usage": c.Float("CPUCreditUsage"), - "credit_balance": c.Float("CPUCreditBalance"), - "surplus_credit_balance": c.Float("CPUSurplusCreditBalance"), - "surplus_credits_charged": c.Float("CPUSurplusCreditsCharged"), - }, - "status": s.Object{ - "check_failed": c.Int("StatusCheckFailed"), - "check_failed_instance": c.Int("StatusCheckFailed_Instance"), - "check_failed_system": c.Int("StatusCheckFailed_System"), - }, - } - - schemaMetricSetFieldsSum = s.Schema{ - "diskio": s.Object{ - "read": s.Object{ - "bytes": c.Float("DiskReadBytes"), - "count": c.Float("DiskReadOps"), - }, - "write": s.Object{ - "bytes": c.Float("DiskWriteBytes"), - "count": c.Float("DiskWriteOps"), - }, - }, - "network": s.Object{ - "in": s.Object{ - "bytes": c.Float("NetworkIn"), - "packets": c.Float("NetworkPacketsIn"), - }, - "out": s.Object{ - "bytes": c.Float("NetworkOut"), - "packets": c.Float("NetworkPacketsOut"), - }, - }, - } -) diff --git a/x-pack/metricbeat/module/aws/ec2/ec2.go b/x-pack/metricbeat/module/aws/ec2/ec2.go deleted file mode 100644 index 7a373fb5358..00000000000 --- a/x-pack/metricbeat/module/aws/ec2/ec2.go +++ /dev/null @@ -1,452 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package ec2 - -import ( - "context" - "encoding/json" - "fmt" - "strconv" - "time" - - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" - "github.com/aws/aws-sdk-go-v2/service/ec2" - "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" - "github.com/pkg/errors" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/metricbeat/mb" - awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" -) - -var ( - metricsetName = "ec2" - statistics = []string{"Average", "Sum"} -) - -type label struct { - InstanceID string - MetricName string - Statistic string -} - -type idStat struct { - instanceID string - statistic string -} - -// init registers the MetricSet with the central registry as soon as the program -// starts. The New function will be called later to instantiate an instance of -// the MetricSet for each host defined in the module's configuration. After the -// MetricSet has been created then Fetch will begin to be called periodically. -func init() { - mb.Registry.MustAddMetricSet(aws.ModuleName, metricsetName, New, - mb.DefaultMetricSet(), - ) -} - -// MetricSet holds any configuration or state information. It must implement -// the mb.MetricSet interface. And this is best achieved by embedding -// mb.BaseMetricSet because it implements all of the required mb.MetricSet -// interface methods except for Fetch. -type MetricSet struct { - *aws.MetricSet - logger *logp.Logger -} - -// New creates a new instance of the MetricSet. New is responsible for unpacking -// any MetricSet specific configuration options if there are any. -func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - logger := logp.NewLogger(metricsetName) - metricSet, err := aws.NewMetricSet(base) - if err != nil { - return nil, errors.Wrap(err, "error creating aws metricset") - } - - // Check if period is set to be multiple of 60s or 300s - remainder300 := int(metricSet.Period.Seconds()) % 300 - remainder60 := int(metricSet.Period.Seconds()) % 60 - if remainder300 != 0 || remainder60 != 0 { - err := errors.New("period needs to be set to 60s (or a multiple of 60s) if detailed monitoring is " + - "enabled for EC2 instances or set to 300s (or a multiple of 300s) if EC2 instances has basic monitoring. " + - "To avoid data missing or extra costs, please make sure period is set correctly in config.yml") - logger.Info(err) - } - - return &MetricSet{ - MetricSet: metricSet, - logger: logger, - }, nil -} - -// Fetch methods implements the data gathering and data conversion to the right -// format. It publishes the event which is then forwarded to the output. In case -// of an error set the Error field of mb.Event or simply call report.Error(). -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - // Get startTime and endTime - startTime, endTime := aws.GetStartTimeEndTime(m.Period, m.Latency) - m.Logger().Debugf("startTime = %s, endTime = %s", startTime, endTime) - - for _, regionName := range m.MetricSet.RegionsList { - awsConfig := m.MetricSet.AwsConfig.Copy() - awsConfig.Region = regionName - - svcEC2 := ec2.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "ec2", regionName, awsConfig)) - - instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2) - if err != nil { - err = errors.Wrap(err, "getInstancesPerRegion failed, skipping region "+regionName) - m.logger.Errorf(err.Error()) - report.Error(err) - continue - } - - svcCloudwatch := cloudwatch.New(awscommon.EnrichAWSConfigWithEndpoint( - m.Endpoint, "monitoring", regionName, awsConfig)) - - namespace := "AWS/EC2" - listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch) - if err != nil { - m.logger.Error(err.Error()) - report.Error(err) - continue - } - - if listMetricsOutput == nil || len(listMetricsOutput) == 0 { - continue - } - - var metricDataQueriesTotal []cloudwatch.MetricDataQuery - for _, instanceID := range instanceIDs { - metricDataQueriesTotal = append(metricDataQueriesTotal, constructMetricQueries(listMetricsOutput, instanceID, m.Period)...) - } - - var metricDataOutput []cloudwatch.MetricDataResult - if len(metricDataQueriesTotal) != 0 { - // Use metricDataQueries to make GetMetricData API calls - metricDataOutput, err = aws.GetMetricDataResults(metricDataQueriesTotal, svcCloudwatch, startTime, endTime) - if err != nil { - err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName) - m.logger.Error(err.Error()) - report.Error(err) - continue - } - - // Create Cloudwatch Events for EC2 - events, err := m.createCloudWatchEvents(metricDataOutput, instancesOutputs, regionName) - if err != nil { - m.logger.Error(err.Error()) - report.Error(err) - continue - } - - for _, event := range events { - if len(event.MetricSetFields) != 0 { - if reported := report.Event(event); !reported { - m.logger.Debug("Fetch interrupted, failed to emit event") - return nil - } - } - } - } - } - - return nil -} - -func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID string, period time.Duration) []cloudwatch.MetricDataQuery { - var metricDataQueries []cloudwatch.MetricDataQuery - metricDataQueryEmpty := cloudwatch.MetricDataQuery{} - for i, listMetric := range listMetricsOutput { - for _, statistic := range statistics { - metricDataQuery := createMetricDataQuery(listMetric, instanceID, i, period, statistic) - if metricDataQuery == metricDataQueryEmpty { - continue - } - metricDataQueries = append(metricDataQueries, metricDataQuery) - } - } - return metricDataQueries -} - -func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceOutput map[string]ec2.Instance, regionName string) (map[string]mb.Event, error) { - // monitoring state for each instance - monitoringStates := map[string]string{} - - // Find a timestamp for all metrics in output - timestamp := aws.FindTimestamp(getMetricDataResults) - if timestamp.IsZero() { - return nil, nil - } - - // Initialize events and metricSetFieldResults per instanceID - events := map[string]mb.Event{} - metricSetFieldResults := map[idStat]map[string]interface{}{} - for instanceID := range instanceOutput { - for _, statistic := range statistics { - events[instanceID] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) - metricSetFieldResults[idStat{instanceID: instanceID, statistic: statistic}] = map[string]interface{}{} - } - } - - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue - } - - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - label, err := newLabelFromJSON(*output.Label) - if err != nil { - m.logger.Errorf("convert cloudwatch MetricDataResult label failed for label = %s: %w", *output.Label, err) - continue - } - - instanceID := label.InstanceID - statistic := label.Statistic - - // Add tags - tags := instanceOutput[instanceID].Tags - if m.TagsFilter != nil { - // Check with each tag filter - // If tag filter doesn't exist in tagKeys/tagValues, - // then do not report this event/instance. - if exists := aws.CheckTagFiltersExist(m.TagsFilter, tags); !exists { - // if tag filter doesn't exist, remove this event initial - // entry to avoid report an empty event. - delete(events, instanceID) - continue - } - } - - // By default, replace dot "." using underscore "_" for tag keys. - // Note: tag values are not dedotted. - for _, tag := range tags { - events[instanceID].ModuleFields.Put("tags."+common.DeDot(*tag.Key), *tag.Value) - // add cloud.instance.name and host.name into ec2 events - if *tag.Key == "Name" { - events[instanceID].RootFields.Put("cloud.instance.name", *tag.Value) - events[instanceID].RootFields.Put("host.name", *tag.Value) - } - } - - machineType, err := instanceOutput[instanceID].InstanceType.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.InstanceType.MarshalValue failed") - } - - events[instanceID].RootFields.Put("cloud.instance.id", instanceID) - events[instanceID].RootFields.Put("cloud.machine.type", machineType) - - placement := instanceOutput[instanceID].Placement - if placement != nil { - events[instanceID].RootFields.Put("cloud.availability_zone", *placement.AvailabilityZone) - } - - if len(output.Values) > timestampIdx { - metricSetFieldResults[idStat{instanceID: instanceID, statistic: statistic}][label.MetricName] = fmt.Sprint(output.Values[timestampIdx]) - } - - instanceStateName, err := instanceOutput[instanceID].State.Name.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.State.Name.MarshalValue failed") - } - - monitoringState, err := instanceOutput[instanceID].Monitoring.State.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed") - } - - monitoringStates[instanceID] = monitoringState - - cpuOptions := instanceOutput[instanceID].CpuOptions - if cpuOptions != nil { - events[instanceID].MetricSetFields.Put("instance.core.count", *cpuOptions.CoreCount) - events[instanceID].MetricSetFields.Put("instance.threads_per_core", *cpuOptions.ThreadsPerCore) - } - - publicIP := instanceOutput[instanceID].PublicIpAddress - if publicIP != nil { - events[instanceID].MetricSetFields.Put("instance.public.ip", *publicIP) - } - - privateIP := instanceOutput[instanceID].PrivateIpAddress - if privateIP != nil { - events[instanceID].MetricSetFields.Put("instance.private.ip", *privateIP) - } - - events[instanceID].MetricSetFields.Put("instance.image.id", *instanceOutput[instanceID].ImageId) - events[instanceID].MetricSetFields.Put("instance.state.name", instanceStateName) - events[instanceID].MetricSetFields.Put("instance.state.code", *instanceOutput[instanceID].State.Code) - events[instanceID].MetricSetFields.Put("instance.monitoring.state", monitoringState) - events[instanceID].MetricSetFields.Put("instance.public.dns_name", *instanceOutput[instanceID].PublicDnsName) - events[instanceID].MetricSetFields.Put("instance.private.dns_name", *instanceOutput[instanceID].PrivateDnsName) - } - } - - for idStat, metricSetFieldsPerInstance := range metricSetFieldResults { - instanceID := idStat.instanceID - statistic := idStat.statistic - - var resultMetricsetFields common.MapStr - var err error - - if len(metricSetFieldsPerInstance) != 0 { - if statistic == "Average" { - // Use "Average" statistic method for CPU and status metrics - resultMetricsetFields, err = aws.EventMapping(metricSetFieldsPerInstance, schemaMetricSetFieldsAverage) - } else if statistic == "Sum" { - // Use "Sum" statistic method for disk and network metrics - resultMetricsetFields, err = aws.EventMapping(metricSetFieldsPerInstance, schemaMetricSetFieldsSum) - } - - if err != nil { - return events, errors.Wrap(err, "EventMapping failed") - } - - // add host cpu/network/disk fields and host.id - addHostFields(resultMetricsetFields, events[instanceID].RootFields, instanceID) - - // add rate metrics - calculateRate(resultMetricsetFields, monitoringStates[instanceID]) - - events[instanceID].MetricSetFields.Update(resultMetricsetFields) - } - } - - return events, nil -} - -func calculateRate(resultMetricsetFields common.MapStr, monitoringState string) { - var period = 300.0 - if monitoringState != "disabled" { - period = 60.0 - } - - metricList := []string{ - "network.in.bytes", - "network.out.bytes", - "network.in.packets", - "network.out.packets", - "diskio.read.bytes", - "diskio.write.bytes", - "diskio.read.count", - "diskio.write.count"} - - for _, metricName := range metricList { - metricValue, err := resultMetricsetFields.GetValue(metricName) - if err == nil && metricValue != nil { - rateValue := metricValue.(float64) / period - resultMetricsetFields.Put(metricName+"_per_sec", rateValue) - } - } -} - -func addHostFields(resultMetricsetFields common.MapStr, rootFields common.MapStr, instanceID string) { - rootFields.Put("host.id", instanceID) - - // If there is no instance name, use instance ID as the host.name - hostName, err := rootFields.GetValue("host.name") - if err == nil && hostName != nil { - rootFields.Put("host.name", hostName) - } else { - rootFields.Put("host.name", instanceID) - } - - hostFieldTable := map[string]string{ - "cpu.total.pct": "host.cpu.usage", - "network.in.bytes": "host.network.ingress.bytes", - "network.out.bytes": "host.network.egress.bytes", - "network.in.packets": "host.network.ingress.packets", - "network.out.packets": "host.network.egress.packets", - "diskio.read.bytes": "host.disk.read.bytes", - "diskio.write.bytes": "host.disk.write.bytes", - } - - for ec2MetricName, hostMetricName := range hostFieldTable { - metricValue, err := resultMetricsetFields.GetValue(ec2MetricName) - if err != nil { - continue - } - - if value, ok := metricValue.(float64); ok { - if ec2MetricName == "cpu.total.pct" { - value = value / 100 - } - rootFields.Put(hostMetricName, value) - } - } -} - -func getInstancesPerRegion(svc ec2iface.ClientAPI) (instanceIDs []string, instancesOutputs map[string]ec2.Instance, err error) { - instancesOutputs = map[string]ec2.Instance{} - output := ec2.DescribeInstancesOutput{NextToken: nil} - init := true - for init || output.NextToken != nil { - init = false - describeInstanceInput := &ec2.DescribeInstancesInput{} - req := svc.DescribeInstancesRequest(describeInstanceInput) - output, err := req.Send(context.Background()) - if err != nil { - err = errors.Wrap(err, "Error DescribeInstances") - return nil, nil, err - } - - for _, reservation := range output.Reservations { - for _, instance := range reservation.Instances { - instanceIDs = append(instanceIDs, *instance.InstanceId) - instancesOutputs[*instance.InstanceId] = instance - } - } - } - return -} - -func createMetricDataQuery(metric cloudwatch.Metric, instanceID string, index int, period time.Duration, statistic string) (metricDataQuery cloudwatch.MetricDataQuery) { - periodInSeconds := int64(period.Seconds()) - id := metricsetName + statistic + strconv.Itoa(index) - metricDims := metric.Dimensions - - for _, dim := range metricDims { - if *dim.Name == "InstanceId" && *dim.Value == instanceID { - metricName := *metric.MetricName - label := newLabel(instanceID, metricName, statistic).JSON() - metricDataQuery = cloudwatch.MetricDataQuery{ - Id: &id, - MetricStat: &cloudwatch.MetricStat{ - Period: &periodInSeconds, - Stat: &statistic, - Metric: &metric, - }, - Label: &label, - } - return - } - } - return -} - -func newLabel(instanceID string, metricName string, statistic string) *label { - return &label{InstanceID: instanceID, MetricName: metricName, Statistic: statistic} -} - -// JSON is a method of label object for converting label to string -func (l *label) JSON() string { - // Ignore error, this cannot fail - out, _ := json.Marshal(l) - return string(out) -} - -func newLabelFromJSON(labelJSON string) (label, error) { - labelStruct := label{} - err := json.Unmarshal([]byte(labelJSON), &labelStruct) - if err != nil { - return labelStruct, fmt.Errorf("json.Unmarshal failed: %w", err) - } - return labelStruct, nil -} diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go index d790140d7e7..b0ab20cd4a3 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" + _ "github.com/elastic/beats/v7/libbeat/processors/actions" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/mtest" ) @@ -32,8 +33,6 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { config := mtest.GetConfigForTest(t, "ec2", "300s") - metricSet := mbtest.NewReportingMetricSetV2Error(t, config) - if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil { - t.Fatal("write", err) - } + metricSet := mbtest.NewFetcher(t, config) + metricSet.WriteEvents(t, "/") } diff --git a/x-pack/metricbeat/module/aws/ec2/ec2_test.go b/x-pack/metricbeat/module/aws/ec2/ec2_test.go index f42e47bea81..dd51b4985fb 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2_test.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2_test.go @@ -2,554 +2,20 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// +build !integration - package ec2 import ( - "net/http" - "testing" - "time" - - awssdk "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch" - "github.com/aws/aws-sdk-go-v2/service/ec2" - "github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface" - "github.com/stretchr/testify/assert" + "os" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" -) - -// MockEC2Client struct is used for unit tests. -type MockEC2Client struct { - ec2iface.ClientAPI -} - -var ( - regionName = "us-west-1" - instanceID = "i-123" - namespace = "AWS/EC2" - statistic = "Average" - - id1 = "cpu1" - metricName1 = "CPUUtilization" - label1 = newLabel(instanceID, metricName1, statistic).JSON() - id2 = "status1" - metricName2 = "StatusCheckFailed" - label2 = newLabel(instanceID, metricName2, statistic).JSON() - - id3 = "status2" - metricName3 = "StatusCheckFailed_System" - label3 = newLabel(instanceID, metricName3, statistic).JSON() - - id4 = "status3" - metricName4 = "StatusCheckFailed_Instance" - label4 = newLabel(instanceID, metricName4, statistic).JSON() + // Register input module and metricset + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws/cloudwatch" ) -func (m *MockEC2Client) DescribeRegionsRequest(input *ec2.DescribeRegionsInput) ec2.DescribeRegionsRequest { - return ec2.DescribeRegionsRequest{ - Request: &awssdk.Request{ - Data: &ec2.DescribeRegionsOutput{ - Regions: []ec2.Region{ - { - RegionName: ®ionName, - }, - }, - }, - }, - } -} - -func (m *MockEC2Client) DescribeInstancesRequest(input *ec2.DescribeInstancesInput) ec2.DescribeInstancesRequest { - runningCode := int64(16) - coreCount := int64(1) - threadsPerCore := int64(1) - publicDNSName := "ec2-1-2-3-4.us-west-1.compute.amazonaws.com" - publicIP := "1.2.3.4" - privateDNSName := "ip-5-6-7-8.us-west-1.compute.internal" - privateIP := "5.6.7.8" - - tags := []ec2.Tag{ - { - Key: awssdk.String("app.kubernetes.io/name"), - Value: awssdk.String("foo"), - }, - { - Key: awssdk.String("helm.sh/chart"), - Value: awssdk.String("foo-chart"), - }, - { - Key: awssdk.String("Name"), - Value: awssdk.String("test-instance"), - }, - } - - instance := ec2.Instance{ - InstanceId: awssdk.String(instanceID), - InstanceType: ec2.InstanceTypeT2Medium, - Placement: &ec2.Placement{ - AvailabilityZone: awssdk.String("us-west-1a"), - }, - ImageId: awssdk.String("image-123"), - State: &ec2.InstanceState{ - Name: ec2.InstanceStateNameRunning, - Code: &runningCode, - }, - Monitoring: &ec2.Monitoring{ - State: ec2.MonitoringStateDisabled, - }, - CpuOptions: &ec2.CpuOptions{ - CoreCount: &coreCount, - ThreadsPerCore: &threadsPerCore, - }, - PublicDnsName: &publicDNSName, - PublicIpAddress: &publicIP, - PrivateDnsName: &privateDNSName, - PrivateIpAddress: &privateIP, - Tags: tags, - } - - httpReq, _ := http.NewRequest("", "", nil) - return ec2.DescribeInstancesRequest{ - Request: &awssdk.Request{ - Data: &ec2.DescribeInstancesOutput{ - Reservations: []ec2.Reservation{ - {Instances: []ec2.Instance{instance}}, - }, - }, - HTTPRequest: httpReq, - }, - } -} - -func TestGetInstanceIDs(t *testing.T) { - mockSvc := &MockEC2Client{} - instanceIDs, instancesOutputs, err := getInstancesPerRegion(mockSvc) - if err != nil { - t.FailNow() - } - - assert.Equal(t, 1, len(instanceIDs)) - assert.Equal(t, 1, len(instancesOutputs)) - - assert.Equal(t, instanceID, instanceIDs[0]) - assert.Equal(t, ec2.InstanceType("t2.medium"), instancesOutputs[instanceID].InstanceType) - assert.Equal(t, awssdk.String("image-123"), instancesOutputs[instanceID].ImageId) - assert.Equal(t, awssdk.String("us-west-1a"), instancesOutputs[instanceID].Placement.AvailabilityZone) -} - -func TestCreateCloudWatchEventsDedotTags(t *testing.T) { - expectedEvent := mb.Event{ - RootFields: common.MapStr{ - "cloud": common.MapStr{ - "region": regionName, - "provider": "aws", - "instance": common.MapStr{"id": "i-123", "name": "test-instance"}, - "machine": common.MapStr{"type": "t2.medium"}, - "availability_zone": "us-west-1a", - }, - "host": common.MapStr{ - "cpu": common.MapStr{"usage": 0.0025}, - "id": "i-123", - "name": "test-instance", - }, - }, - MetricSetFields: common.MapStr{ - "cpu": common.MapStr{ - "total": common.MapStr{"pct": 0.25}, - }, - "instance": common.MapStr{ - "image": common.MapStr{"id": "image-123"}, - "core": common.MapStr{"count": int64(1)}, - "threads_per_core": int64(1), - "state": common.MapStr{"code": int64(16), "name": "running"}, - "monitoring": common.MapStr{"state": "disabled"}, - "public": common.MapStr{ - "dns_name": "ec2-1-2-3-4.us-west-1.compute.amazonaws.com", - "ip": "1.2.3.4", - }, - "private": common.MapStr{ - "dns_name": "ip-5-6-7-8.us-west-1.compute.internal", - "ip": "5.6.7.8", - }, - }, - "tags": common.MapStr{ - "app_kubernetes_io/name": "foo", - "helm_sh/chart": "foo-chart", - "Name": "test-instance", - }, - }, - } - svcEC2Mock := &MockEC2Client{} - instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock) - assert.NoError(t, err) - assert.Equal(t, 1, len(instanceIDs)) - instanceID := instanceIDs[0] - assert.Equal(t, instanceID, instanceID) - timestamp := time.Now() - - getMetricDataOutput := []cloudwatch.MetricDataResult{ - { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id2, - Label: &label2, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id3, - Label: &label3, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id4, - Label: &label4, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - } - - metricSet := MetricSet{ - &aws.MetricSet{}, - logp.NewLogger("test"), - } - - events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1") - assert.NoError(t, err) - assert.Equal(t, 1, len(events)) - assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields) - assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"]) - assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"]) - assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"]) -} - -func TestCreateCloudWatchEventsWithTagsFilter(t *testing.T) { - expectedEvent := mb.Event{ - RootFields: common.MapStr{ - "cloud": common.MapStr{ - "region": regionName, - "provider": "aws", - "instance": common.MapStr{"id": "i-123", "name": "test-instance"}, - "machine": common.MapStr{"type": "t2.medium"}, - "availability_zone": "us-west-1a", - }, - "host": common.MapStr{ - "cpu": common.MapStr{"usage": 0.0025}, - "id": "i-123", - "name": "test-instance", - }, - }, - MetricSetFields: common.MapStr{ - "cpu": common.MapStr{ - "total": common.MapStr{"pct": 0.25}, - }, - "instance": common.MapStr{ - "image": common.MapStr{"id": "image-123"}, - "core": common.MapStr{"count": int64(1)}, - "threads_per_core": int64(1), - "state": common.MapStr{"code": int64(16), "name": "running"}, - "monitoring": common.MapStr{"state": "disabled"}, - "public": common.MapStr{ - "dns_name": "ec2-1-2-3-4.us-west-1.compute.amazonaws.com", - "ip": "1.2.3.4", - }, - "private": common.MapStr{ - "dns_name": "ip-5-6-7-8.us-west-1.compute.internal", - "ip": "5.6.7.8", - }, - }, - "tags": common.MapStr{ - "app_kubernetes_io/name": "foo", - "helm_sh/chart": "foo-chart", - "Name": "test-instance", - }, - }, - } - - svcEC2Mock := &MockEC2Client{} - instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock) - assert.NoError(t, err) - assert.Equal(t, 1, len(instanceIDs)) - instanceID := instanceIDs[0] - assert.Equal(t, instanceID, instanceID) - timestamp := time.Now() - - getMetricDataOutput := []cloudwatch.MetricDataResult{ - { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id2, - Label: &label2, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id3, - Label: &label3, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id4, - Label: &label4, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - } - - metricSet := MetricSet{ - &aws.MetricSet{ - TagsFilter: []aws.Tag{{ - Key: "app.kubernetes.io/name", - Value: "foo", - }}, - }, - logp.NewLogger("test"), - } - events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1") - - assert.NoError(t, err) - assert.Equal(t, 1, len(events)) - assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields) - assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"]) - assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"]) - assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"]) -} - -func TestCreateCloudWatchEventsWithNotMatchingTagsFilter(t *testing.T) { - svcEC2Mock := &MockEC2Client{} - instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock) - assert.NoError(t, err) - assert.Equal(t, 1, len(instanceIDs)) - instanceID := instanceIDs[0] - assert.Equal(t, instanceID, instanceID) - timestamp := time.Now() - - getMetricDataOutput := []cloudwatch.MetricDataResult{ - { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id2, - Label: &label2, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id3, - Label: &label3, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - { - Id: &id4, - Label: &label4, - Values: []float64{0.0}, - Timestamps: []time.Time{timestamp}, - }, - } - - metricSet := MetricSet{ - &aws.MetricSet{ - TagsFilter: []aws.Tag{{ - Key: "app_kubernetes_io/name", - Value: "not_foo", - }}, - }, - logp.NewLogger("test"), - } - events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1") - assert.NoError(t, err) - assert.Equal(t, 0, len(events)) -} - -func TestConstructMetricQueries(t *testing.T) { - name := "InstanceId" - dim := cloudwatch.Dimension{ - Name: &name, - Value: &instanceID, - } - - listMetric := cloudwatch.Metric{ - Dimensions: []cloudwatch.Dimension{dim}, - MetricName: &metricName1, - Namespace: &namespace, - } - - listMetricsOutput := []cloudwatch.Metric{listMetric} - metricDataQuery := constructMetricQueries(listMetricsOutput, instanceID, 5*time.Minute) - assert.Equal(t, 2, len(metricDataQuery)) - assert.Equal(t, "{\"InstanceID\":\"i-123\",\"MetricName\":\"CPUUtilization\",\"Statistic\":\"Average\"}", *metricDataQuery[0].Label) - assert.Equal(t, "Average", *metricDataQuery[0].MetricStat.Stat) - assert.Equal(t, metricName1, *metricDataQuery[0].MetricStat.Metric.MetricName) - assert.Equal(t, namespace, *metricDataQuery[0].MetricStat.Metric.Namespace) -} - -func TestCalculateRate(t *testing.T) { - resultMetricsetFields := common.MapStr{ - "network.in.bytes": 1367316.0, - "network.out.bytes": 427380.0, - "network.in.packets": 2895.0, - "network.out.packets": 2700.0, - "diskio.read.bytes": 300.0, - "diskio.write.bytes": 600.0, - "diskio.read.count": 30.0, - "diskio.write.count": 60.0, - } - - cases := []struct { - rateMetricName string - rateMetricValue float64 - rateMetricValueDetailed float64 - }{ - { - "network.in.bytes_per_sec", - 4557.72, - 22788.6, - }, - { - "network.out.bytes_per_sec", - 1424.6, - 7123.0, - }, - { - "network.in.packets_per_sec", - 9.65, - 48.25, - }, - { - "network.out.packets_per_sec", - 9.0, - 45.0, - }, - { - "diskio.read.bytes_per_sec", - 1.0, - 5.0, - }, - { - "diskio.write.bytes_per_sec", - 2.0, - 10.0, - }, - { - "diskio.read.count_per_sec", - 0.1, - 0.5, - }, - { - "diskio.write.count_per_sec", - 0.2, - 1.0, - }, - } - - calculateRate(resultMetricsetFields, "disabled") - for _, c := range cases { - output, err := resultMetricsetFields.GetValue(c.rateMetricName) - assert.NoError(t, err) - assert.Equal(t, c.rateMetricValue, output) - } - - calculateRate(resultMetricsetFields, "enabled") - for _, c := range cases { - output, err := resultMetricsetFields.GetValue(c.rateMetricName) - assert.NoError(t, err) - assert.Equal(t, c.rateMetricValueDetailed, output) - } -} - -func TestCreateCloudWatchEventsWithInstanceName(t *testing.T) { - expectedEvent := mb.Event{ - RootFields: common.MapStr{ - "cloud": common.MapStr{ - "region": regionName, - "provider": "aws", - "instance": common.MapStr{"id": "i-123", "name": "test-instance"}, - "machine": common.MapStr{"type": "t2.medium"}, - "availability_zone": "us-west-1a", - }, - "host": common.MapStr{ - "cpu": common.MapStr{"usage": 0.25}, - "id": "i-123", - }, - }, - MetricSetFields: common.MapStr{ - "tags": common.MapStr{ - "app_kubernetes_io/name": "foo", - "helm_sh/chart": "foo-chart", - "Name": "test-instance", - }, - }, - } - svcEC2Mock := &MockEC2Client{} - instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock) - assert.NoError(t, err) - assert.Equal(t, 1, len(instanceIDs)) - instanceID := instanceIDs[0] - assert.Equal(t, instanceID, instanceID) - timestamp := time.Now() - - getMetricDataOutput := []cloudwatch.MetricDataResult{ - { - Id: &id1, - Label: &label1, - Values: []float64{0.25}, - Timestamps: []time.Time{timestamp}, - }, - } - - metricSet := MetricSet{ - &aws.MetricSet{}, - logp.NewLogger("test"), - } - - events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1") - assert.NoError(t, err) - assert.Equal(t, 1, len(events)) - - assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"]) - - hostID, err := events[instanceID].RootFields.GetValue("host.id") - assert.NoError(t, err) - assert.Equal(t, "i-123", hostID) - - instanceName, err := events[instanceID].RootFields.GetValue("cloud.instance.name") - assert.NoError(t, err) - assert.Equal(t, "test-instance", instanceName) -} - -func TestNewLabel(t *testing.T) { - instanceID := "i-123" - metricName := "CPUUtilization" - statistic := "Average" - label := newLabel(instanceID, metricName, statistic).JSON() - assert.Equal(t, "{\"InstanceID\":\"i-123\",\"MetricName\":\"CPUUtilization\",\"Statistic\":\"Average\"}", label) -} - -func TestConvertLabel(t *testing.T) { - labelStr := "{\"InstanceID\":\"i-123\",\"MetricName\":\"CPUUtilization\",\"Statistic\":\"Average\"}" - label, err := newLabelFromJSON(labelStr) - assert.NoError(t, err) - assert.Equal(t, "i-123", label.InstanceID) - assert.Equal(t, "CPUUtilization", label.MetricName) - assert.Equal(t, "Average", label.Statistic) +func init() { + // To be moved to some kind of helper + os.Setenv("BEAT_STRICT_PERMS", "false") + mb.Registry.SetSecondarySource(mb.NewLightModulesSource("../../../module")) } diff --git a/x-pack/metricbeat/module/aws/ec2/manifest.yml b/x-pack/metricbeat/module/aws/ec2/manifest.yml new file mode 100644 index 00000000000..fe914e5c37b --- /dev/null +++ b/x-pack/metricbeat/module/aws/ec2/manifest.yml @@ -0,0 +1,85 @@ +default: true +input: + module: aws + metricset: cloudwatch + defaults: + metrics: + - namespace: AWS/EC2 + statistic: ["Average"] + name: + - CPUUtilization + - CPUCreditUsage + - CPUCreditBalance + - CPUSurplusCreditBalance + - CPUSurplusCreditsCharged + - StatusCheckFailed + - StatusCheckFailed_Instance + - StatusCheckFailed_System + - namespace: AWS/EC2 + statistic: ["Sum" ] + name: + - DiskReadBytes + - DiskReadOps + - DiskWriteBytes + - DiskWriteOps + - NetworkIn + - NetworkPacketsIn + - NetworkOut + - NetworkPacketsOut +processors: + - rename: + ignore_missing: true + fields: + - from: "aws.ec2.metrics.CPUUtilization.avg" + to: "aws.ec2.cpu.total.pct" + - from: "aws.ec2.metrics.CPUCreditUsage.avg" + to: "aws.ec2.cpu.credit_usage" + - from: "aws.ec2.metrics.CPUCreditBalance.avg" + to: "aws.ec2.cpu.credit_balance" + - from: "aws.ec2.metrics.CPUSurplusCreditBalance.avg" + to: "aws.ec2.cpu.surplus_credit_balance" + - from: "aws.ec2.metrics.CPUSurplusCreditsCharged.avg" + to: "aws.ec2.cpu.surplus_credits_charged" + - from: "aws.ec2.metrics.StatusCheckFailed.avg" + to: "aws.ec2.status.check_failed" + - from: "aws.ec2.metrics.StatusCheckFailed_Instance.avg" + to: "aws.ec2.status.check_failed_instance" + - from: "aws.ec2.metrics.StatusCheckFailed_System.avg" + to: "aws.ec2.status.check_failed_system" + - from: "aws.ec2.metrics.DiskReadBytes.sum" + to: "aws.ec2.diskio.read.bytes" + - from: "aws.ec2.metrics.DiskReadOps.sum" + to: "aws.ec2.diskio.read.count" + - from: "aws.ec2.metrics.DiskWriteBytes.sum" + to: "aws.ec2.diskio.write.bytes" + - from: "aws.ec2.metrics.DiskWriteOps.sum" + to: "aws.ec2.diskio.write.count" + - from: "aws.ec2.metrics.NetworkIn.sum" + to: "aws.ec2.network.in.bytes" + - from: "aws.ec2.metrics.NetworkPacketsIn.sum" + to: "aws.ec2.network.in.packets" + - from: "aws.ec2.metrics.NetworkOut.sum" + to: "aws.ec2.network.out.bytes" + - from: "aws.ec2.metrics.NetworkPacketsOut.sum" + to: "aws.ec2.network.out.packets" + - from: "aws.ec2.metrics.NetworkIn.rate" + to: "aws.ec2.network.in.bytes_per_sec" + - from: "aws.ec2.metrics.NetworkOut.rate" + to: "aws.ec2.network.out.bytes_per_sec" + - from: "aws.ec2.metrics.NetworkPacketsIn.rate" + to: "aws.ec2.network.in.packets_per_sec" + - from: "aws.ec2.metrics.NetworkPacketsOut.rate" + to: "aws.ec2.network.out.packets_per_sec" + - from: "aws.ec2.metrics.DiskReadBytes.rate" + to: "aws.ec2.diskio.read.bytes_per_sec" + - from: "aws.ec2.metrics.DiskWriteBytes.rate" + to: "aws.ec2.diskio.write.bytes_per_sec" + - from: "aws.ec2.metrics.DiskReadOps.rate" + to: "aws.ec2.diskio.read.count_per_sec" + - from: "aws.ec2.metrics.DiskWriteOps.rate" + to: "aws.ec2.diskio.write.count_per_sec" + + - drop_fields: + ignore_missing: true + fields: + - "aws.ec2.metrics" diff --git a/x-pack/metricbeat/module/aws/module.yml b/x-pack/metricbeat/module/aws/module.yml index c6129426e8b..0ccce838bc7 100644 --- a/x-pack/metricbeat/module/aws/module.yml +++ b/x-pack/metricbeat/module/aws/module.yml @@ -1,5 +1,6 @@ name: aws metricsets: + - ec2 - elb - ebs - usage