Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check return from report.Event and migrate to use ReportingMetricSetV2Error #11775

Merged
merged 5 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions x-pack/metricbeat/module/aws/ec2/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"aws": {
"ec2": {
"cpu": {
"credit_balance": 144,
"credit_usage": 0.001823,
"credit_usage": 0.008328,
"surplus_credit_balance": 0,
"surplus_credits_charged": 0,
"total": {
"pct": 0.033333333333303
"pct": 0.1000000000000606
}
},
"diskio": {
Expand Down Expand Up @@ -51,12 +47,12 @@
},
"network": {
"in": {
"bytes": 56,
"packets": 1
"bytes": 615.2,
"packets": 6.2
},
"out": {
"bytes": 88,
"packets": 1.6
"bytes": 841.4,
"packets": 6.8
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}
},
"status": {
Expand Down
30 changes: 15 additions & 15 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/metricbeat/module/aws"
)
Expand All @@ -38,13 +37,11 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
logger *logp.Logger
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ec2Logger := logp.NewLogger(aws.ModuleName)
metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "error creating aws metricset")
Expand All @@ -57,25 +54,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
err := errors.New("period needs to be set to 60s (or a multiple of 60s) if detailed monitoring is " +
"enabled for EC2 instances or set to 300s (or a multiple of 300s) if EC2 instances has basic monitoring. " +
"To avoid data missing or extra costs, please make sure period is set correctly in config.yml")
ec2Logger.Info(err)
base.Logger().Info(err)
}

return &MetricSet{
MetricSet: metricSet,
logger: ec2Logger,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Get startTime and endTime
startTime, endTime, err := aws.GetStartTimeEndTime(m.DurationString)
if err != nil {
m.logger.Error(errors.Wrap(err, "Error ParseDuration"))
report.Error(err)
return
return errors.Wrap(err, "Error ParseDuration")
}

for _, regionName := range m.MetricSet.RegionsList {
Expand All @@ -84,7 +78,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2)
if err != nil {
err = errors.Wrap(err, "getInstancesPerRegion failed, skipping region "+regionName)
m.logger.Errorf(err.Error())
m.Logger().Errorf(err.Error())
report.Error(err)
continue
}
Expand All @@ -93,7 +87,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
namespace := "AWS/EC2"
listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch)
if err != nil {
m.logger.Error(err.Error())
m.Logger().Error(err.Error())
report.Error(err)
continue
}
Expand All @@ -112,7 +106,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
metricDataOutput, err = aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName+" for instance "+instanceID)
m.logger.Error(err.Error())
m.Logger().Error(err.Error())
report.Error(err)
continue
}
Expand All @@ -121,17 +115,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
// Create Cloudwatch Events for EC2
event, info, err := createCloudWatchEvents(metricDataOutput, instanceID, instancesOutputs[instanceID], regionName)
if info != "" {
m.logger.Info(info)
m.Logger().Info(info)
}

if err != nil {
m.logger.Error(err.Error())
m.Logger().Error(err.Error())
report.Error(err)
continue
}
report.Event(event)

if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
}
}
}

return nil
}

func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID string, periodInSec int) []cloudwatch.MetricDataQuery {
Expand Down
22 changes: 8 additions & 14 deletions x-pack/metricbeat/module/aws/ec2/ec2_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ func TestFetch(t *testing.T) {
t.Skip("Skipping TestFetch: " + info)
}

ec2MetricSet := mbtest.NewReportingMetricSetV2(t, config)
events, errs := mbtest.ReportingFetchV2(ec2MetricSet)
if errs != nil {
t.Skip("Skipping TestFetch: failed to make api calls. Please check $AWS_ACCESS_KEY_ID, " +
"$AWS_SECRET_ACCESS_KEY and $AWS_SESSION_TOKEN in config.yml")
metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
t.FailNow()
}
t.Logf("Module: %s Metricset: %s", ec2MetricSet.Module().Name(), ec2MetricSet.Name())
assert.NotEmpty(t, events)

for _, event := range events {
// RootField
Expand Down Expand Up @@ -76,9 +71,8 @@ func TestData(t *testing.T) {
t.Skip("Skipping TestData: " + info)
}

ec2MetricSet := mbtest.NewReportingMetricSetV2(t, config)
errs := mbtest.WriteEventsReporterV2(ec2MetricSet, t, "/")
if errs != nil {
t.Fatal("write", errs)
metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil {
t.Fatal("write", err)
}
}
32 changes: 13 additions & 19 deletions x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/elastic/beats/libbeat/common"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/x-pack/metricbeat/module/aws"
)
Expand All @@ -38,25 +37,18 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
logger *logp.Logger
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The aws s3_daily_storage metricset is beta.")
s3Logger := logp.NewLogger(aws.ModuleName)

moduleConfig := aws.Config{}
if err := base.Module().UnpackConfig(&moduleConfig); err != nil {
return nil, err
}

if moduleConfig.Period == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this change related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related. Sorry forgot to update on the PR . This check is not needed here because it's already done in aws.NewMetricSet(base) in line 60.

err := errors.New("period is not set in AWS module config")
s3Logger.Error(err)
}

metricSet, err := aws.NewMetricSet(base)
if err != nil {
return nil, errors.Wrap(err, "error creating aws metricset")
Expand All @@ -68,27 +60,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
err := errors.New("period needs to be set to 86400s (or a multiple of 86400s). " +
"To avoid data missing or extra costs, please make sure period is set correctly " +
"in config.yml")
s3Logger.Info(err)
base.Logger().Info(err)
}

return &MetricSet{
MetricSet: metricSet,
logger: s3Logger,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
namespace := "AWS/S3"
// Get startTime and endTime
startTime, endTime, err := aws.GetStartTimeEndTime(m.DurationString)
if err != nil {
err = errors.Wrap(err, "Error ParseDuration")
m.logger.Error(err.Error())
report.Error(err)
return
return errors.Wrap(err, "Error ParseDuration")
}

// GetMetricData for AWS S3 from Cloudwatch
Expand All @@ -98,7 +86,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
listMetricsOutputs, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch)
if err != nil {
err = errors.Wrap(err, "GetListMetricsOutput failed, skipping region "+regionName)
m.logger.Error(err.Error())
m.Logger().Error(err.Error())
report.Error(err)
continue
}
Expand All @@ -112,7 +100,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
metricDataOutputs, err := aws.GetMetricDataResults(metricDataQueries, svcCloudwatch, startTime, endTime)
if err != nil {
err = errors.Wrap(err, "GetMetricDataResults failed, skipping region "+regionName)
m.logger.Error(err)
m.Logger().Error(err)
report.Error(err)
continue
}
Expand All @@ -123,14 +111,20 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
event, err := createCloudWatchEvents(metricDataOutputs, regionName, bucketName)
if err != nil {
err = errors.Wrap(err, "createCloudWatchEvents failed")
m.logger.Error(err)
m.Logger().Error(err)
event.Error = err
report.Event(event)
continue
}
report.Event(event)

if reported := report.Event(event); !reported {
m.Logger().Debug("Fetch interrupted, failed to emit event")
return nil
}
}
}

return nil
}

func getBucketNames(listMetricsOutputs []cloudwatch.Metric) (bucketNames []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ func TestFetch(t *testing.T) {
t.Skip("Skipping TestFetch: " + info)
}

s3DailyMetricSet := mbtest.NewReportingMetricSetV2(t, config)
events, err := mbtest.ReportingFetchV2(s3DailyMetricSet)
if err != nil {
t.Skip("Skipping TestFetch: failed to make api calls. Please check $AWS_ACCESS_KEY_ID, " +
"$AWS_SECRET_ACCESS_KEY and $AWS_SESSION_TOKEN in config.yml")
metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assert.Empty(t, err)
if !assert.NotEmpty(t, events) {
t.FailNow()
}
t.Logf("Module: %s Metricset: %s", s3DailyMetricSet.Module().Name(), s3DailyMetricSet.Name())
assert.NotEmpty(t, events)

for _, event := range events {
// RootField
Expand All @@ -52,9 +47,8 @@ func TestData(t *testing.T) {
t.Skip("Skipping TestData: " + info)
}

ec2MetricSet := mbtest.NewReportingMetricSetV2(t, config)
errs := mbtest.WriteEventsReporterV2(ec2MetricSet, t, "/")
if errs != nil {
t.Fatal("write", errs)
metricSet := mbtest.NewReportingMetricSetV2Error(t, config)
if err := mbtest.WriteEventsReporterV2Error(metricSet, t, "/"); err != nil {
t.Fatal("write", err)
}
}
Loading