Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Add region to googlecloud module config #16203

Merged
merged 8 commits into from
Feb 21, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add collecting AuroraDB metrics in rds metricset. {issue}14142[14142] {pull}16004[16004]
- Reuse connections in SQL module. {pull}16001[16001]
- Improve the `logstash` module (when `xpack.enabled` is set to `true`) to use the override `cluster_uuid` returned by Logstash APIs. {issue}15772[15772] {pull}15795[15795]
- Add region parameter in googlecloud module. {issue}15780[15780] {pull}16203[16203]
- Add kubernetes storage class support via kube-state-metrics. {pull}16145[16145]
- Add support for NATS 2.1. {pull}16317[16317]
- Add Load Balancing metricset to GCP {pull}15559[15559]
Expand Down
1 change: 1 addition & 0 deletions metricbeat/docs/modules/googlecloud.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Note: extra GCP charges on Stackdriver Monitoring API requests will be generated
This is a list of the possible module parameters you can tune:

* *zone*: A single string with the zone you want to monitor like "us-central1-a". If you need to fetch from multiple regions, you have to setup a different configuration for each (but you don't need a new instance of Metricbeat running)
* *region*: A single string with the region you want to monitor like "us-central1". This will enable monitoring for all zones under this region.
* *project_id*: A single string with your GCP Project ID
* *credentials_file_path*: A single string pointing to the JSON file path reachable by Metricbeat that you have created using IAM.
* *exclude_labels*: (`true`/`false` default `false`) Do not extract extra labels and metadata information from Metricsets and fetch metrics onlly. At the moment, *labels and metadata extraction is only supported* in Compute Metricset.
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/googlecloud/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Note: extra GCP charges on Stackdriver Monitoring API requests will be generated
This is a list of the possible module parameters you can tune:

* *zone*: A single string with the zone you want to monitor like "us-central1-a". If you need to fetch from multiple regions, you have to setup a different configuration for each (but you don't need a new instance of Metricbeat running)
* *region*: A single string with the region you want to monitor like "us-central1". This will enable monitoring for all zones under this region.
* *project_id*: A single string with your GCP Project ID
* *credentials_file_path*: A single string pointing to the JSON file path reachable by Metricbeat that you have created using IAM.
* *exclude_labels*: (`true`/`false` default `false`) Do not extract extra labels and metadata information from Metricsets and fetch metrics onlly. At the moment, *labels and metadata extraction is only supported* in Compute Metricset.
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/googlecloud/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type MetadataCollectorInputData struct {
TimeSeries *monitoringpb.TimeSeries
ProjectID string
Zone string
Region string
Point *monitoringpb.Point
Timestamp *time.Time
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
)

// NewMetadataService returns the specific Metadata service for a GCP Compute resource
func NewMetadataService(projectID, zone string, opt ...option.ClientOption) (googlecloud.MetadataService, error) {
func NewMetadataService(projectID, zone string, region string, opt ...option.ClientOption) (googlecloud.MetadataService, error) {
return &metadataCollector{
projectID: projectID,
zone: zone,
region: region,
opt: opt,
instanceCache: common.NewCache(30*time.Second, 13),
}, nil
Expand All @@ -48,6 +49,7 @@ type computeMetadata struct {
type metadataCollector struct {
projectID string
zone string
region string
opt []option.ClientOption

computeMetadata *computeMetadata
Expand All @@ -58,7 +60,7 @@ type metadataCollector struct {
// Metadata implements googlecloud.MetadataCollector to the known set of labels from a Compute TimeSeries single point of data.
func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.TimeSeries) (googlecloud.MetadataCollectorData, error) {
if s.computeMetadata == nil {
_, err := s.instanceMetadata(ctx, s.instanceID(resp), s.zone)
_, err := s.instanceMetadata(ctx, s.instanceID(resp), s.zone, s.region)
if err != nil {
return googlecloud.MetadataCollectorData{}, err
}
Expand Down Expand Up @@ -102,8 +104,8 @@ func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.Tim
}

// instanceMetadata returns the labels of an instance
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string) (*computeMetadata, error) {
i, err := s.instance(ctx, instanceID, zone)
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zone string, region string) (*computeMetadata, error) {
i, err := s.instance(ctx, instanceID, zone, region)
if err != nil {
return nil, errors.Wrapf(err, "error trying to get data from instance '%s' in zone '%s'", instanceID, zone)
}
Expand All @@ -113,6 +115,10 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
zone: zone,
}

if i == nil {
return s.computeMetadata, nil
}

if i.Labels != nil {
s.computeMetadata.User = i.Labels
}
Expand All @@ -133,7 +139,7 @@ func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, zo
}

// instance returns data from an instance ID using the cache or making a request
func (s *metadataCollector) instance(ctx context.Context, instanceID, zone string) (i *compute.Instance, err error) {
func (s *metadataCollector) instance(ctx context.Context, instanceID, zone string, region string) (*compute.Instance, error) {
service, err := compute.NewService(ctx, s.opt...)
if err != nil {
return nil, errors.Wrapf(err, "error getting client from Compute service")
Expand All @@ -146,13 +152,34 @@ func (s *metadataCollector) instance(ctx context.Context, instanceID, zone strin
}
}

instanceData, err := service.Instances.Get(s.projectID, zone, instanceID).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting instance information for instance with ID '%s'", instanceID)
if region != "" {
regionData, err := service.Regions.Get(s.projectID, region).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting region information for '%s'", region)
}

zones := regionData.Zones
for _, zone := range zones {
zString := strings.Split(zone, "/")
zName := zString[len(zString)-1]
instanceData, err := service.Instances.Get(s.projectID, zName, instanceID).Do()
if err != nil {
continue
}
s.instanceCache.Put(instanceID, instanceData)
return instanceData, nil
}
}
s.instanceCache.Put(instanceID, instanceData)

return instanceData, nil
if zone != "" {
instanceData, err := service.Instances.Get(s.projectID, zone, instanceID).Do()
if err != nil {
return nil, errors.Wrapf(err, "error getting instance information for instance with ID '%s'", instanceID)
}
s.instanceCache.Put(instanceID, instanceData)
return instanceData, nil
}
return nil, nil
}

func (s *metadataCollector) instanceID(ts *monitoringpb.TimeSeries) string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func NewMetadataServiceForConfig(c config) (googlecloud.MetadataService, error) {
switch c.ServiceName {
case googlecloud.ServiceCompute:
return compute.NewMetadataService(c.ProjectID, c.Zone, c.opt...)
return compute.NewMetadataService(c.ProjectID, c.Zone, c.Region, c.opt...)
case googlecloud.ServicePubsub, googlecloud.ServiceLoadBalancing:
return nil, nil
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ type stackdriverMetricsRequester struct {
func (r *stackdriverMetricsRequester) Metric(ctx context.Context, m string) (out []*monitoringpb.TimeSeries) {
out = make([]*monitoringpb.TimeSeries, 0)

filter := r.getFilterForMetric(m)

req := &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + r.config.ProjectID,
Interval: r.interval,
View: monitoringpb.ListTimeSeriesRequest_FULL,
Filter: filter,
Filter: r.getFilterForMetric(m),
}

it := r.client.ListTimeSeries(ctx, req)
Expand All @@ -79,6 +77,18 @@ func (r *stackdriverMetricsRequester) Metric(ctx context.Context, m string) (out
return
}

func constructFilter(m string, region string, zone string) string {
filter := fmt.Sprintf(`metric.type="%s" AND resource.labels.zone = `, m)
// If region is specified, use region as filter resource label.
// If region is empty but zone is given, use zone instead.
if region != "" {
filter += fmt.Sprintf(`starts_with("%s")`, region)
} else if zone != "" {
filter += fmt.Sprintf(`"%s"`, zone)
}
return filter
}

func (r *stackdriverMetricsRequester) Metrics(ctx context.Context, ms []string) ([]*monitoringpb.TimeSeries, error) {
var lock sync.Mutex
var wg sync.WaitGroup
Expand Down Expand Up @@ -120,9 +130,16 @@ func (r *stackdriverMetricsRequester) getFilterForMetric(m string) (f string) {
case googlecloud.ServicePubsub, googlecloud.ServiceLoadBalancing:
return
default:
f = fmt.Sprintf(`%s AND resource.labels.zone = "%s"`, f, r.config.Zone)
if r.config.Region != "" && r.config.Zone != "" {
r.logger.Warnf("when region %s and zone %s config parameter "+
"both are provided, only use region", r.config.Region, r.config.Zone)
}
if r.config.Region != "" {
f = fmt.Sprintf(`%s AND resource.labels.zone = starts_with("%s")`, f, r.config.Region)
} else if r.config.Zone != "" {
f = fmt.Sprintf(`%s AND resource.labels.zone = "%s"`, f, r.config.Zone)
}
}

return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,98 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func TestGetFilterForMetric(t *testing.T) {
r := stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}}
func TestStringInSlice(t *testing.T) {
cases := []struct {
title string
m string
region string
zone string
expectedFilter string
}{
{
"construct filter with zone",
"compute.googleapis.com/instance/cpu/utilization",
"",
"us-east1-b",
"metric.type=\"compute.googleapis.com/instance/cpu/utilization\" AND resource.labels.zone = \"us-east1-b\"",
},
{
"construct filter with region",
"compute.googleapis.com/instance/cpu/utilization",
"us-east1",
"",
"metric.type=\"compute.googleapis.com/instance/cpu/utilization\" AND resource.labels.zone = starts_with(\"us-east1\")",
},
}

s := r.getFilterForMetric("compute.googleapis.com/firewall/dropped_bytes_count")
assert.Equal(t, `metric.type="compute.googleapis.com/firewall/dropped_bytes_count" AND resource.labels.zone = "us-central1-a"`, s)
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
filter := constructFilter(c.m, c.region, c.zone)
assert.Equal(t, c.expectedFilter, filter)
})
}
}

s = r.getFilterForMetric("pubsub.googleapis.com/subscription/ack_message_count")
assert.Equal(t, `metric.type="pubsub.googleapis.com/subscription/ack_message_count"`, s)
func TestGetFilterForMetric(t *testing.T) {
var logger = logp.NewLogger("test")
cases := []struct {
title string
m string
r stackdriverMetricsRequester
expectedFilter string
}{
{
"compute service with zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = \"us-central1-a\"",
},
{
"pubsub service with zone in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with zone in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Zone: "us-central1-a"}},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
"compute service with region in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-east1\")",
},
{
"pubsub service with region in config",
"pubsub.googleapis.com/subscription/ack_message_count",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"pubsub.googleapis.com/subscription/ack_message_count\"",
},
{
"loadbalancing service with region in config",
"loadbalancing.googleapis.com/https/backend_latencies",
stackdriverMetricsRequester{config: config{Region: "us-east1"}},
"metric.type=\"loadbalancing.googleapis.com/https/backend_latencies\"",
},
{
"compute service with both region and zone in config",
"compute.googleapis.com/firewall/dropped_bytes_count",
stackdriverMetricsRequester{config: config{Region: "us-central1", Zone: "us-central1-a"}, logger: logger},
"metric.type=\"compute.googleapis.com/firewall/dropped_bytes_count\" AND resource.labels.zone = starts_with(\"us-central1\")",
},
}

s = r.getFilterForMetric("loadbalancing.googleapis.com/https/backend_latencies")
assert.Equal(t, `metric.type="loadbalancing.googleapis.com/https/backend_latencies"`, s)
for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
filter := c.r.getFilterForMetric(c.m)
assert.Equal(t, c.expectedFilter, filter)
})
}
}
15 changes: 12 additions & 3 deletions x-pack/metricbeat/module/googlecloud/stackdriver/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type MetricSet struct {

type config struct {
Metrics []string `config:"stackdriver.metrics" validate:"required"`
Zone string `config:"zone" validate:"required"`
Zone string `config:"zone"`
Region string `config:"region"`
ProjectID string `config:"project_id" validate:"required"`
ExcludeLabels bool `config:"exclude_labels"`
ServiceName string `config:"stackdriver.service" validate:"required"`
Expand Down Expand Up @@ -78,12 +79,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err error) {
reqs, err := newStackdriverMetricsRequester(ctx, m.config, m.Module().Config().Period, m.Logger())
if err != nil {
return errors.Wrapf(err, "error trying to do create a request client to GCP project '%s' in zone '%s'", m.config.ProjectID, m.config.Zone)
return errors.Wrapf(err, "error trying to do create a request client to GCP project '%s' in zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
}

responses, err := reqs.Metrics(ctx, m.config.Metrics)
if err != nil {
return errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s'", m.config.ProjectID, m.config.Zone)

return errors.Wrapf(err, "error trying to get metrics for project '%s' and zone '%s' or region '%s'", m.config.ProjectID, m.config.Zone, m.config.Region)
}

events, err := m.eventMapping(ctx, responses)
Expand Down Expand Up @@ -145,3 +147,10 @@ func validatePeriodForGCP(d time.Duration) (err error) {

return nil
}

func (c *config) Validate() error {
if c.Region == "" && c.Zone == "" {
return errors.New("region and zone in Google Cloud config file cannot both be empty")
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (m *MetricSet) timeSeriesGrouped(ctx context.Context, gcpService googleclou
return nil, err
}

sdCollectorInputData := googlecloud.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone)
sdCollectorInputData := googlecloud.NewStackdriverCollectorInputData(ts, m.config.ProjectID, m.config.Zone, m.config.Region)

for i := range keyValues {
sdCollectorInputData.Timestamp = &keyValues[i].Timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
)

// NewStackdriverCollectorInputData returns a ready to use MetadataCollectorInputData to be sent to Metadata collectors
func NewStackdriverCollectorInputData(ts *monitoringpb.TimeSeries, projectID, zone string) *MetadataCollectorInputData {
func NewStackdriverCollectorInputData(ts *monitoringpb.TimeSeries, projectID, zone string, region string) *MetadataCollectorInputData {
return &MetadataCollectorInputData{
TimeSeries: ts,
ProjectID: projectID,
Zone: zone,
Region: region,
}
}

Expand Down