Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] GCP cloudsql metadata #33066

Merged
merged 13 commits into from
Nov 15, 2022
227 changes: 227 additions & 0 deletions x-pack/metricbeat/module/gcp/metrics/cloudsql/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package cloudsql

import (
"context"
"errors"
"fmt"
"strings"

"google.golang.org/api/option"
sqladmin "google.golang.org/api/sqladmin/v1"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"

"github.com/elastic/beats/v7/x-pack/metricbeat/module/gcp"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// NewMetadataService returns the specific Metadata service for a GCP CloudSQL resource.
func NewMetadataService(projectID, zone string, region string, regions []string, opt ...option.ClientOption) (gcp.MetadataService, error) {
return &metadataCollector{
projectID: projectID,
zone: zone,
region: region,
regions: regions,
opt: opt,
instances: make(map[string]*sqladmin.DatabaseInstance),
logger: logp.NewLogger("metrics-cloudsql"),
}, nil
}

// cloudsqlMetadata is an object to store data in between the extraction and the writing in the destination (to uncouple
// reading and writing in the same method)
type cloudsqlMetadata struct {
// projectID string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove the projectID, machineType and ts here?

region string
instanceID string
instanceName string
// machineType string
databaseVersion string

// ts *monitoringpb.TimeSeries

User map[string]string
Metadata map[string]string
Metrics interface{}
System interface{}
}

type metadataCollector struct {
projectID string
zone string
region string
regions []string
opt []option.ClientOption
// NOTE: instances holds data used for all metrics collected in a given period
// this avoids calling the remote endpoint for each metric, which would take a long time overall
instances map[string]*sqladmin.DatabaseInstance
endorama marked this conversation as resolved.
Show resolved Hide resolved
logger *logp.Logger
}

func getDatabaseNameAndVersion(db string) mapstr.M {
parts := strings.SplitN(strings.ToLower(db), "_", 2)

var cloudsqlDb mapstr.M

switch {
case db == "SQL_DATABASE_VERSION_UNSPECIFIED":
cloudsqlDb = mapstr.M{
"name": "sql",
"version": "unspecified",
}
case strings.Contains(parts[0], "sqlserver"):
cloudsqlDb = mapstr.M{
"name": strings.ToLower(parts[0]),
"version": strings.ToLower(parts[1]),
}
default:
version := strings.ReplaceAll(parts[1], "_", ".")
cloudsqlDb = mapstr.M{
"name": strings.ToLower(parts[0]),
"version": version,
}
}

return cloudsqlDb
}

// Metadata implements googlecloud.MetadataCollector to the known set of labels from a CloudSQL TimeSeries single point of data.
func (s *metadataCollector) Metadata(ctx context.Context, resp *monitoringpb.TimeSeries) (gcp.MetadataCollectorData, error) {
cloudsqlMetadata, err := s.instanceMetadata(ctx, s.instanceID(resp), s.instanceRegion(resp))
if err != nil {
return gcp.MetadataCollectorData{}, err
}

stackdriverLabels := gcp.NewStackdriverMetadataServiceForTimeSeries(resp)

metadataCollectorData, err := stackdriverLabels.Metadata(ctx, resp)
if err != nil {
return gcp.MetadataCollectorData{}, err
}

if resp.Resource != nil && resp.Resource.Labels != nil {
_, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceIDKey, cloudsqlMetadata.instanceID)
}

if resp.Metric.Labels != nil {
_, _ = metadataCollectorData.ECS.Put(gcp.ECSCloudInstanceNameKey, cloudsqlMetadata.instanceName)
}

cloudsqlMetadata.Metrics = metadataCollectorData.Labels[gcp.LabelMetrics]
cloudsqlMetadata.System = metadataCollectorData.Labels[gcp.LabelSystem]

if cloudsqlMetadata.databaseVersion != "" {
err := mapstr.MergeFields(metadataCollectorData.Labels, mapstr.M{
"cloudsql": getDatabaseNameAndVersion(cloudsqlMetadata.databaseVersion),
}, true)
if err != nil {
s.logger.Warnf("failed merging cloudsql to label fields: %v", err)
}
}

return metadataCollectorData, nil
}

func (s *metadataCollector) instanceID(ts *monitoringpb.TimeSeries) string {
if ts.Resource != nil && ts.Resource.Labels != nil {
return ts.Resource.Labels["database_id"]
}

return ""
}

func (s *metadataCollector) instanceRegion(ts *monitoringpb.TimeSeries) string {
if ts.Resource != nil && ts.Resource.Labels != nil {
return ts.Resource.Labels["region"]
}

return ""
}

// instanceMetadata returns the labels of an instance
func (s *metadataCollector) instanceMetadata(ctx context.Context, instanceID, region string) (*cloudsqlMetadata, error) {
instance, err := s.instance(ctx, instanceID)
if err != nil {
return nil, fmt.Errorf("error trying to get data from instance '%s' %w", instanceID, err)
}

cloudsqlMetadata := &cloudsqlMetadata{
instanceID: instanceID,
region: region,
}

if instance == nil {
s.logger.Debugf("couldn't find instance %s, call sqladmin Instances.List", instanceID)
return cloudsqlMetadata, nil
}

if instance.DatabaseVersion != "" {
cloudsqlMetadata.databaseVersion = instance.DatabaseVersion
}

if instance.Name != "" {
cloudsqlMetadata.instanceName = instance.Name
}

return cloudsqlMetadata, nil
}

func (s *metadataCollector) ID(ctx context.Context, in *gcp.MetadataCollectorInputData) (string, error) {
metadata, err := s.Metadata(ctx, in.TimeSeries)
if err != nil {
return "", err
}

metadata.ECS.Update(metadata.Labels)
if in.Timestamp != nil {
_, _ = metadata.ECS.Put("timestamp", in.Timestamp)
} else if in.Point != nil {
_, _ = metadata.ECS.Put("timestamp", in.Point.Interval.EndTime)
} else {
return "", errors.New("no timestamp information found")
}

return metadata.ECS.String(), nil
}

func (s *metadataCollector) instance(ctx context.Context, instanceName string) (*sqladmin.DatabaseInstance, error) {
s.getInstances(ctx)

instance, ok := s.instances[instanceName]
if ok {
return instance, nil
}

// Remake the compute instances map to avoid having stale data.
s.instances = make(map[string]*sqladmin.DatabaseInstance)

return nil, nil
}

func (s *metadataCollector) getInstances(ctx context.Context) {
if len(s.instances) > 0 {
return
}

s.logger.Debug("sqladmin Instances.List API")

service, err := sqladmin.NewService(ctx, s.opt...)
if err != nil {
s.logger.Errorf("error getting client from sqladmin service: %v", err)
return
}

req := service.Instances.List(s.projectID)
if err := req.Pages(ctx, func(page *sqladmin.InstancesListResponse) error {
for _, instancesScopedList := range page.Items {
s.instances[fmt.Sprintf("%s:%s", instancesScopedList.Project, instancesScopedList.Name)] = instancesScopedList
}
return nil
}); err != nil {
s.logger.Errorf("sqladmin Instances.List error: %v", err)
}
}
Loading