Skip to content

Commit

Permalink
[Metricbeat] Add region to googlecloud module config (elastic#16203)
Browse files Browse the repository at this point in the history
* Add region to googlecloud module config

* Add changelog

* add warning when zone and region are both provided in config

* add more unit test for getFilterForMetric

* check if instance is nil before checking labels/machinetype
  • Loading branch information
kaiyan-sheng authored Feb 21, 2020
1 parent f9fe104 commit f006db1
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add collecting AuroraDB metrics in rds metricset. {issue}14142[14142] {pull}16004[16004]
- Reuse connections in SQL module. {pull}16001[16001]
- Improve the `logstash` module (when `xpack.enabled` is set to `true`) to use the override `cluster_uuid` returned by Logstash APIs. {issue}15772[15772] {pull}15795[15795]
- Add region parameter in googlecloud module. {issue}15780[15780] {pull}16203[16203]
- Add kubernetes storage class support via kube-state-metrics. {pull}16145[16145]
- Add support for NATS 2.1. {pull}16317[16317]
- Add Load Balancing metricset to GCP {pull}15559[15559]
Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/modules/googlecloud.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Note: extra GCP charges on Stackdriver Monitoring API requests will be generated
This is a list of the possible module parameters you can tune:

* *zone*: A single string with the zone you want to monitor like "us-central1-a". If you need to fetch from multiple regions, you have to setup a different configuration for each (but you don't need a new instance of Metricbeat running)
* *region*: A single string with the region you want to monitor like "us-central1". This will enable monitoring for all zones under this region.
* *project_id*: A single string with your GCP Project ID
* *credentials_file_path*: A single string pointing to the JSON file path reachable by Metricbeat that you have created using IAM.
* *exclude_labels*: (`true`/`false` default `false`) Do not extract extra labels and metadata information from Metricsets and fetch metrics onlly. At the moment, *labels and metadata extraction is only supported* in Compute Metricset.
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/googlecloud/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Note: extra GCP charges on Stackdriver Monitoring API requests will be generated
This is a list of the possible module parameters you can tune:

* *zone*: A single string with the zone you want to monitor like "us-central1-a". If you need to fetch from multiple regions, you have to setup a different configuration for each (but you don't need a new instance of Metricbeat running)
* *region*: A single string with the region you want to monitor like "us-central1". This will enable monitoring for all zones under this region.
* *project_id*: A single string with your GCP Project ID
* *credentials_file_path*: A single string pointing to the JSON file path reachable by Metricbeat that you have created using IAM.
* *exclude_labels*: (`true`/`false` default `false`) Do not extract extra labels and metadata information from Metricsets and fetch metrics onlly. At the moment, *labels and metadata extraction is only supported* in Compute Metricset.
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/googlecloud/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type MetadataCollectorInputData struct {
TimeSeries *monitoringpb.TimeSeries
ProjectID string
Zone string
Region string
Point *monitoringpb.Point
Timestamp *time.Time
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
)

// NewMetadataService returns the specific Metadata service for a GCP Compute resource
func NewMetadataService(projectID, zone string, opt ...option.ClientOption) (googlecloud.MetadataService, error) {
func NewMetadataService(projectID, zone string, region string, opt ...option.ClientOption) (googlecloud.MetadataService, error) {
return &metadataCollector{
projectID: projectID,
zone: zone,
region: region,
opt: opt,
instanceCache: common.NewCache(30*time.Second, 13),
}, nil
Expand All @@ -48,6 +49,7 @@ type computeMetadata struct {
type metadataCollector struct {
projectID string
zone string
region string
opt []option.ClientOption

computeMetadata *computeMetadata
Expand All @@ -58,7 +60,7 @@ type metadataCollector struct {
// Metadata implements googlecloud.MetadataCollector to the known set of labels from a Compute TimeSeries single point of data.
func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.TimeSeries) (googlecloud.MetadataCollectorData, error) {
if s.computeMetadata == nil {
_, err := s.instanceMetadata(ctx, s.instanceID(resp), s.zone)
_, err := s.instanceMetadata(ctx, s.instanceID(resp), s.zone, s.region)
if err != nil {
return googlecloud.MetadataCollectorData{}, err
}
Expand Down Expand Up @@ -102,8 +104,8 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
}

// instanceMetadata returns the labels of an instance
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string) (*computeMetadata, error) {
i, err := s.instance(ctx, instanceID, zone)
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string, region string) (*computeMetadata, error) {
i, err := s.instance(ctx, instanceID, zone, region)
if err != nil {
return nil, errors.Wrapf(err, "error trying to get data from instance '%s' in zone '%s'", instanceID, zone)
}
Expand All @@ -113,6 +115,10 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
zone: zone,
}

if i == nil {
return s.computeMetadata, nil
}

if i.Labels != nil {
s.computeMetadata.User = i.Labels
}
Expand All @@ -133,7 +139,7 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
}

// instance returns data from an instance ID using the cache or making a request
func (s *metadataCollector) instance(ctx context.Context, instanceID, zone string) (i *compute.Instance, err error) {
func (s *metadataCollector) instance(ctx context.Context, instanceID, zone string, region string) (*compute.Instance, error) {
service, err := compute.NewService(ctx, s.opt...)
if err != nil {
return nil, errors.Wrapf(err, "error getting client from Compute service")
Expand All @@ -146,13 +152,34 @@ func (s *metadataCollector) instance(ctx context.Context, instanceID, zone strin
}
}

instanceData, err := service.Instances.Get(s.projectID, zone, instanceID).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting instance information for instance with ID '%s'", instanceID)
if region != "" {
regionData, err := service.Regions.Get(s.projectID, region).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting region information for '%s'", region)
}

zones := regionData.Zones
for _, zone := range zones {
zString := strings.Split(zone, "/")
zName := zString[len(zString)-1]
instanceData, err := service.Instances.Get(s.projectID, zName, instanceID).Do()
if err != nil {
continue
}
s.instanceCache.Put(instanceID, instanceData)
return instanceData, nil
}
}
s.instanceCache.Put(instanceID, instanceData)

return instanceData, nil
if zone != "" {
instanceData, err := service.Instances.Get(s.projectID, zone, instanceID).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting instance information for instance with ID '%s'", instanceID)
}
s.instanceCache.Put(instanceID, instanceData)
return instanceData, nil
}
return nil, nil
}

func (s *metadataCollector) instanceID(ts *monitoringpb.TimeSeries) string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func NewMetadataServiceForConfig(c config) (googlecloud.MetadataService, error) {
switch c.ServiceName {
case googlecloud.ServiceCompute:
return compute.NewMetadataService(c.ProjectID, c.Zone, c.opt...)
return compute.NewMetadataService(c.ProjectID, c.Zone, c.Region, c.opt...)
case googlecloud.ServicePubsub, googlecloud.ServiceLoadBalancing:
return nil, nil
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ type stackdriverMetricsRequester struct {
func (r *stackdriverMetricsRequester) Metric(ctx context.Context, m string) (out []*monitoringpb.TimeSeries) {
out = make([]*monitoringpb.TimeSeries, 0)

filter := r.getFilterForMetric(m)

req := &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + r.config.ProjectID,
Interval: r.interval,
View: monitoringpb.ListTimeSeriesRequest_FULL,
Filter: filter,
Filter: r.getFilterForMetric(m),
}

it := r.client.ListTimeSeries(ctx, req)
Expand All @@ -79,6 +77,18 @@ func (r *stackdriverMetricsRequester) Metric(ctx context.Context, m string) (out
return
}

func constructFilter(m string, region string, zone string) string {
filter := fmt.Sprintf(`metric.type="%s" AND resource.labels.zone = `, m)
// If region is specified, use region as filter resource label.
// If region is empty but zone is given, use zone instead.
if region != "" {
filter += fmt.Sprintf(`starts_with("%s")`, region)
} else if zone != "" {
filter += fmt.Sprintf(`"%s"`, zone)
}
return filter
}

func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, ms []string) ([]*monitoringpb.TimeSeries, error) {
var lock sync.Mutex
var wg sync.WaitGroup
Expand Down Expand Up @@ -120,9 +130,16 @@ func (r *stackdriverMetricsRequester) getFilterForMetric(m string) (f string) {
case googlecloud.ServicePubsub, googlecloud.ServiceLoadBalancing:
return
default:
f = fmt.Sprintf(`%s AND resource.labels.zone = "%s"`, f, r.config.Zone)
if r.config.Region != "" && r.config.Zone != "" {
r.logger.Warnf("when region %s and zone %s config parameter "+
"both are provided, only use region", r.config.Region, r.config.Zone)
}
if r.config.Region != "" {
f = fmt.Sprintf(`%s AND resource.labels.zone = starts_with("%s")`, f, r.config.Region)
} else if r.config.Zone != "" {
f = fmt.Sprintf(`%s AND resource.labels.zone = "%s"`, f, r.config.Zone)
}
}

return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,98 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func TestGetFilterForMetric(t *testing.T) {
r := stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}}
func TestStringInSlice(t *testing.T) {
cases := []struct {
title string
m string
region string
zone string
expectedFilter string
}{
{
"construct filter with zone",
"compute.googleapis.com/instance/cpu/utilization",
"",
"us-east1-b",
"metric.type=\"compute.googleapis.com/instance/cpu/utilization\" AND resource.labels.zone = \"us-east1-b\"",
},
{
"construct filter with region",
"compute.googleapis.com/instance/cpu/utilization",
"us-east1",
"",
"metric.type=\"compute.googleapis.com/instance/cpu/utilization\" AND resource.labels.zone = starts_with(\"us-east1\")",
},
}

s := r.getFilterForMetric("compute.googleapis.com/firewall/dropped_bytes_count")
assert.Equal(t, `metric.type="compute.googleapis.com/firewall/dropped_bytes_count" AND resource.labels.zone = "us-central1-a"`, s)
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
filter := constructFilter(c.m, c.region, c.zone)
assert.Equal(t, c.expectedFilter, filter)
})
}
}

s = r.getFilterForMetric("pubsub.googleapis.com/subscription/ack_message_count")
assert.Equal(t, `metric.type="pubsub.googleapis.com/subscription/ack_message_count"`, s)
func TestGetFilterForMetric(t *testing.T) {
var logger = logp.NewLogger("test")
cases := []struct {
title string
m string
r stackdriverMetricsRequester
expectedFilter string
}{
{
"compute service with zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = \"us-central1-a\"",
},
{
"pubsub service with zone in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with zone in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
"compute service with region in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-east1\")",
},
{
"pubsub service with region in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with region in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
"compute service with both region and zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Region: "us-central1", Zone: "us-central1-a"}, logger: logger},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-central1\")",
},
}

s = r.getFilterForMetric("loadbalancing.googleapis.com/https/backend_latencies")
assert.Equal(t, `metric.type="loadbalancing.googleapis.com/https/backend_latencies"`, s)
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
filter := c.r.getFilterForMetric(c.m)
assert.Equal(t, c.expectedFilter, filter)
})
}
}
15 changes: 12 additions & 3 deletions x-pack/metricbeat/module/googlecloud/stackdriver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type MetricSet struct {

type config struct {
Metrics []string `config:"stackdriver.metrics" validate:"required"`
Zone string `config:"zone" validate:"required"`
Zone string `config:"zone"`
Region string `config:"region"`
ProjectID string `config:"project_id" validate:"required"`
ExcludeLabels bool `config:"exclude_labels"`
ServiceName string `config:"stackdriver.service" validate:"required"`
Expand Down Expand Up @@ -78,12 +79,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err error) {
reqs, err := newStackdriverMetricsRequester(ctx, m.config, m.Module().Config().Period, m.Logger())
if err != nil {
return errors.Wrapf(err, "error trying to do create a request client to GCP project '%s' in zone '%s'", m.config.ProjectID, m.config.Zone)
return errors.Wrapf(err, "error trying to do create a request client to GCP project '%s' in zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
}

responses, err := reqs.Metrics(ctx, m.config.Metrics)
if err != nil {
return errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s'", m.config.ProjectID, m.config.Zone)

return errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
}

events, err := m.eventMapping(ctx, responses)
Expand Down Expand Up @@ -145,3 +147,10 @@ func validatePeriodForGCP(d time.Duration) (err error) {

return nil
}

func (c *config) Validate() error {
if c.Region == "" && c.Zone == "" {
return errors.New("region and zone in Google Cloud config file cannot both be empty")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (m *MetricSet) timeSeriesGrouped(ctx context.Context, gcpService googleclou
return nil, err
}

sdCollectorInputData := googlecloud.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone)
sdCollectorInputData := googlecloud.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone, m.config.Region)

for i := range keyValues {
sdCollectorInputData.Timestamp = &keyValues[i].Timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
)

// NewStackdriverCollectorInputData returns a ready to use MetadataCollectorInputData to be sent to Metadata collectors
func NewStackdriverCollectorInputData(ts *monitoringpb.TimeSeries, projectID, zone string) *MetadataCollectorInputData {
func NewStackdriverCollectorInputData(ts *monitoringpb.TimeSeries, projectID, zone string, region string) *MetadataCollectorInputData {
return &MetadataCollectorInputData{
TimeSeries: ts,
ProjectID: projectID,
Zone: zone,
Region: region,
}
}

Expand Down

0 comments on commit f006db1

Please sign in to comment.