From 6af9c0193503c88ca0935de036aba82b38c6fdca Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 21 Nov 2023 09:27:09 +0100 Subject: [PATCH] Azure Monitor: adjust grouping logic and avoid duplicating documents to make the metricset TSDB-friendly (#36823) ## Overview (WHAT) Here is a summary of the changes introduced with this PR. - Update the metric grouping logic - Track metrics collection info - Adjust collection interval ### Update the metric grouping logic Streamlines the metrics grouping logic to include all the fields the TSDB team identified as dimensions for the Azure Metrics events. Here are the current components of the grouping key: - timestamp - namespace - resource ID - resource Sub ID - dimensions - time grain It also tries to make the grouping simpler to read. (WHY) When TSDB is enabled, it drops events with the same timestamp and dimensions. The metricset must group all metrics values by timestamp+dimensions and create one event for each group to avoid data loss. ### Track metrics collection info The metricset tracks the timestamp and time grain for each metrics collection. At the beginning of each iteration, it skips collecting a value if the metricset has already collected a value for the (time grain) period. (WHY) The metricset usually collects one data point for each collection period. When the time grain is larger than the collection period, the metricset collects the identical data point multiple times. For example, consider a `PT1H` (one hour) time grain and a collection period of five minutes: without tracking, the metrics would collect the identical `PT1H` data point 12 times. ### Adjust collection interval Change the collection interval to `[{-2 x INTERVAL},{-1 x INTERVAL})` with a delay of `{INTERVAL}`. (WHY) The collection interval was [2x the collection period or time grain](https://github.com/elastic/beats/blob/ed34c37f59c7bc0cf9e8051f7b5327c861b59467/x-pack/metricbeat/module/azure/client.go#L110-L116). This interval is too large, and we collected multiple data points for the same metric. There was some code to drop the additional data points, but it wasn't working in all cases. Glossary: - collection interval: the time range used to fetch metrics values. - collection period: time between metric collections (e.g., with a 5 min period, the metricset collects new metrics every 5 minutes) (cherry picked from commit 886d078ba1f08532c1874aed932bd82cd0f6948a) --- CHANGELOG.next.asciidoc | 7 + .../metricbeat/module/azure/add_metadata.go | 19 +- x-pack/metricbeat/module/azure/azure.go | 35 +- x-pack/metricbeat/module/azure/client.go | 222 +++++++-- .../metricbeat/module/azure/client_utils.go | 46 +- .../module/azure/client_utils_test.go | 30 -- x-pack/metricbeat/module/azure/data.go | 442 ++++++++++++------ x-pack/metricbeat/module/azure/data_test.go | 78 ---- .../module/azure/monitor/client_helper.go | 5 +- 9 files changed, 561 insertions(+), 323 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b6af7e1d1fe6..ad27e5593687 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,13 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix EC2 host.cpu.usage {pull}35717[35717] - Add option in SQL module to execute queries for all dbs. {pull}35688[35688] - Add remaining dimensions for azure storage account to make them available for tsdb enablement. {pull}36331[36331] +- Add missing 'TransactionType' dimension for Azure Storage Account. {pull}36413[36413] +- Add log error when statsd server fails to start {pull}36477[36477] +- Fix CassandraConnectionClosures metric configuration {pull}34742[34742] +- Fix event mapping implementation for statsd module {pull}36925[36925] +- The region and availability_zone ecs fields nested within the cloud field. {pull}37015[37015] +- Fix CPU and memory metrics collection from privileged process on Windows {issue}17314[17314]{pull}37027[37027] +- Enhanced Azure Metrics metricset with refined grouping logic and resolved duplication issues for TSDB compatibility {pull}36823[36823] *Osquerybeat* diff --git a/x-pack/metricbeat/module/azure/add_metadata.go b/x-pack/metricbeat/module/azure/add_metadata.go index 23bf4500eb8a..2e0a3361e8be 100644 --- a/x-pack/metricbeat/module/azure/add_metadata.go +++ b/x-pack/metricbeat/module/azure/add_metadata.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) +// addHostMetadata enriches the event with host metadata. func addHostMetadata(event *mb.Event, metricList mapstr.M) { hostFieldTable := map[string]string{ "percentage_cpu.avg": "host.cpu.usage", @@ -30,24 +31,28 @@ func addHostMetadata(event *mb.Event, metricList mapstr.M) { if metricName == "percentage_cpu.avg" { value = value / 100 } - event.RootFields.Put(hostName, value) + _, _ = event.RootFields.Put(hostName, value) } } } +// addCloudVMMetadata enriches the event with cloud VM metadata. func addCloudVMMetadata(event *mb.Event, vm VmResource, subscriptionId string) { if vm.Name != "" { - event.RootFields.Put("cloud.instance.name", vm.Name) - event.RootFields.Put("host.name", vm.Name) + _, _ = event.RootFields.Put("cloud.instance.name", vm.Name) + _, _ = event.RootFields.Put("host.name", vm.Name) } + if vm.Id != "" { - event.RootFields.Put("cloud.instance.id", vm.Id) - event.RootFields.Put("host.id", vm.Id) + _, _ = event.RootFields.Put("cloud.instance.id", vm.Id) + _, _ = event.RootFields.Put("host.id", vm.Id) } + if vm.Size != "" { - event.RootFields.Put("cloud.machine.type", vm.Size) + _, _ = event.RootFields.Put("cloud.machine.type", vm.Size) } + if subscriptionId != "" { - event.RootFields.Put("cloud.account.id", subscriptionId) + _, _ = event.RootFields.Put("cloud.account.id", subscriptionId) } } diff --git a/x-pack/metricbeat/module/azure/azure.go b/x-pack/metricbeat/module/azure/azure.go index aa26a0c57057..8425e442912b 100644 --- a/x-pack/metricbeat/module/azure/azure.go +++ b/x-pack/metricbeat/module/azure/azure.go @@ -87,24 +87,43 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { // It publishes the event which is then forwarded to the output. In case // of an error set the Error field of mb.Event or simply call report.Error(). func (m *MetricSet) Fetch(report mb.ReporterV2) error { + // Initialize cloud resources and monitor metrics + // information. + // + // The client collects and stores: + // - existing cloud resource definitions (e.g. VMs, DBs, etc.) + // - metric definitions for the resources (e.g. CPU, memory, etc.) + // + // The metricset periodically refreshes the information + // after `RefreshListInterval` (default 600s for + // most metricsets). err := m.Client.InitResources(m.MapMetrics) if err != nil { return err } + if len(m.Client.ResourceConfigurations.Metrics) == 0 { - // error message is previously logged in the InitResources, no error event should be created + // error message is previously logged in the InitResources, + // no error event should be created return nil } - // retrieve metrics - groupedMetrics := groupMetricsByResource(m.Client.ResourceConfigurations.Metrics) - for _, metrics := range groupedMetrics { - results := m.Client.GetMetricValues(metrics, report) - err := EventsMapping(results, m.Client, report) - if err != nil { - return fmt.Errorf("error running EventsMapping: %w", err) + // Group metric definitions by cloud resource ID. + // + // We group the metric definitions by resource ID to fetch + // metric values for each cloud resource in one API call. + metricsByResourceId := groupMetricsDefinitionsByResourceId(m.Client.ResourceConfigurations.Metrics) + + for _, metricsDefinition := range metricsByResourceId { + // Fetch metric values for each resource. + metricValues := m.Client.GetMetricValues(metricsDefinition, report) + + // Turns metric values into events and sends them to Elasticsearch. + if err := mapToEvents(metricValues, m.Client, report); err != nil { + return fmt.Errorf("error mapping metrics to events: %w", err) } } + return nil } diff --git a/x-pack/metricbeat/module/azure/client.go b/x-pack/metricbeat/module/azure/client.go index e9595425be39..13e2d65ec5b6 100644 --- a/x-pack/metricbeat/module/azure/client.go +++ b/x-pack/metricbeat/module/azure/client.go @@ -16,6 +16,71 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +// NewMetricRegistry instantiates a new metric registry. +func NewMetricRegistry() *MetricRegistry { + return &MetricRegistry{ + collectionsInfo: make(map[string]MetricCollectionInfo), + } +} + +// MetricRegistry keeps track of the last time a metric was collected and +// the time grain used. +// +// This is used to avoid collecting the same metric values over and over again +// when the time grain is larger than the collection interval. +type MetricRegistry struct { + collectionsInfo map[string]MetricCollectionInfo +} + +// Update updates the metric registry with the latest timestamp and +// time grain for the given metric. +func (m *MetricRegistry) Update(metric Metric, info MetricCollectionInfo) { + m.collectionsInfo[m.buildMetricKey(metric)] = info +} + +// NeedsUpdate returns true if the metric needs to be updated. +func (m *MetricRegistry) NeedsUpdate(metric Metric) bool { + // The key is a combination of the namespace, + // resource ID and metric names. + metricKey := m.buildMetricKey(metric) + + if info, exists := m.collectionsInfo[metricKey]; exists { + duration := convertTimeGrainToDuration(info.timeGrain) + + // Check if the metric has been collected within a + // time period defined by the time grain. + if info.timestamp.After(time.Now().Add(duration * (-1))) { + return false + } + } + + // If the metric is not in the registry, it means that it has never + // been collected before. + // + // In this case, we need to collect the metric. + return true +} + +// buildMetricKey builds a key for the metric registry. +// +// The key is a combination of the namespace, resource ID and metric names. +func (m *MetricRegistry) buildMetricKey(metric Metric) string { + keyComponents := []string{ + metric.Namespace, + metric.ResourceId, + } + keyComponents = append(keyComponents, metric.Names...) + + return strings.Join(keyComponents, ",") +} + +// MetricCollectionInfo contains information about the last time +// a metric was collected and the time grain used. +type MetricCollectionInfo struct { + timestamp time.Time + timeGrain string +} + // Client represents the azure client which will make use of the azure sdk go metrics related clients type Client struct { AzureMonitorService Service @@ -23,6 +88,7 @@ type Client struct { ResourceConfigurations ResourceConfiguration Log *logp.Logger Resources []Resource + MetricRegistry *MetricRegistry } // mapResourceMetrics function type will map the configuration options to client metrics (depending on the metricset) @@ -39,6 +105,7 @@ func NewClient(config Config) (*Client, error) { AzureMonitorService: azureMonitorService, Config: config, Log: logp.NewLogger("azure monitor client"), + MetricRegistry: NewMetricRegistry(), } client.ResourceConfigurations.RefreshInterval = config.RefreshListInterval @@ -52,10 +119,12 @@ func (client *Client) InitResources(fn mapResourceMetrics) error { if len(client.Config.Resources) == 0 { return fmt.Errorf("no resource options defined") } + // check if refresh interval has been set and if it has expired if !client.ResourceConfigurations.Expired() { return nil } + var metrics []Metric //reset client resources client.Resources = []Resource{} @@ -66,13 +135,15 @@ func (client *Client) InitResources(fn mapResourceMetrics) error { err = fmt.Errorf("failed to retrieve resources: %w", err) return err } + if len(resourceList) == 0 { err = fmt.Errorf("failed to retrieve resources: No resources returned using the configuration options resource ID %s, resource group %s, resource type %s, resource query %s", resource.Id, resource.Group, resource.Type, resource.Query) client.Log.Error(err) continue } - //map resources to the client + + // Map resources to the client for _, resource := range resourceList { if !containsResource(*resource.ID, client.Resources) { client.Resources = append(client.Resources, Resource{ @@ -85,10 +156,13 @@ func (client *Client) InitResources(fn mapResourceMetrics) error { Subscription: client.Config.SubscriptionId}) } } + + // Collects and stores metrics definitions for the cloud resources. resourceMetrics, err := fn(client, resourceList, resource) if err != nil { return err } + metrics = append(metrics, resourceMetrics...) } // users could add or remove resources while metricbeat is running so we could encounter the situation where resources are unavailable we log an error message (see above) @@ -97,23 +171,46 @@ func (client *Client) InitResources(fn mapResourceMetrics) error { client.Log.Debug("no resources were found based on all the configurations options entered") } client.ResourceConfigurations.Metrics = metrics + return nil } -// GetMetricValues returns the specified metric data points for the specified resource ID/namespace. -func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) []Metric { - var resultedMetrics []Metric - // loop over the set of metrics +// GetMetricValues returns the metric values for the given cloud resources. +func (client *Client) GetMetricValues(metrics []Metric, reporter mb.ReporterV2) []Metric { + var result []Metric + + // Same end time for all metrics in the same batch. + referenceTime := time.Now().UTC() + interval := client.Config.Period + + // Fetch in the range [{-2 x INTERVAL},{-1 x INTERVAL}) with a delay of {INTERVAL}. + endTime := referenceTime.Add(interval * (-1)) + startTime := endTime.Add(interval * (-1)) + timespan := fmt.Sprintf("%s/%s", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)) + for _, metric := range metrics { - // select period to collect metrics, will double the interval value in order to retrieve any missing values - //if timegrain is larger than intervalx2 then interval will be assigned the timegrain value - interval := client.Config.Period - if t := convertTimegrainToDuration(metric.TimeGrain); t > interval*2 { - interval = t + // + // Before fetching the metric values, check if the metric + // has been collected within the time grain. + // + // Why do we need this? + // + // Some metricsets contains metrics with long time grains (e.g. 1 hour). + // + // If we collect the metric values every 5 minutes, we will end up fetching + // the same data over and over again for all metrics with a time grain + // larger than 5 minutes. + // + // The registry keeps track of the last timestamp the metricset collected + // the metric values and the time grain used. + // + // By comparing the last collection time with the current time, and + // the time grain of the metric, we can determine if the metric needs + // to be collected again, or if we can skip it. + // + if !client.MetricRegistry.NeedsUpdate(metric) { + continue } - endTime := time.Now().UTC() - startTime := endTime.Add(interval * (-2)) - timespan := fmt.Sprintf("%s/%s", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)) // build the 'filter' parameter which will contain any dimensions configured var filter string @@ -124,30 +221,66 @@ func (client *Client) GetMetricValues(metrics []Metric, report mb.ReporterV2) [] } filter = strings.Join(filterList, " AND ") } - resp, timegrain, err := client.AzureMonitorService.GetMetricValues(metric.ResourceSubId, metric.Namespace, metric.TimeGrain, timespan, metric.Names, - metric.Aggregations, filter) + + // Fetch the metric values from the Azure API. + resp, timeGrain, err := client.AzureMonitorService.GetMetricValues( + metric.ResourceSubId, + metric.Namespace, + metric.TimeGrain, + timespan, + metric.Names, + metric.Aggregations, + filter, + ) if err != nil { err = fmt.Errorf("error while listing metric values by resource ID %s and namespace %s: %w", metric.ResourceSubId, metric.Namespace, err) client.Log.Error(err) - report.Error(err) - } else { - for i, currentMetric := range client.ResourceConfigurations.Metrics { - if matchMetrics(currentMetric, metric) { - current := mapMetricValues(resp, currentMetric.Values, endTime.Truncate(time.Minute).Add(interval*(-1)), endTime.Truncate(time.Minute)) - client.ResourceConfigurations.Metrics[i].Values = current - if client.ResourceConfigurations.Metrics[i].TimeGrain == "" { - client.ResourceConfigurations.Metrics[i].TimeGrain = timegrain - } - resultedMetrics = append(resultedMetrics, client.ResourceConfigurations.Metrics[i]) + reporter.Error(err) + + // Skip this metric and continue with the next one. + break + } + + // Update the metric registry with the latest timestamp and + // time grain for each metric. + // + // We track the time grain Azure used for this metric values from + // the API response. + client.MetricRegistry.Update(metric, MetricCollectionInfo{ + timeGrain: timeGrain, + timestamp: referenceTime, + }) + + for i, currentMetric := range client.ResourceConfigurations.Metrics { + if matchMetrics(currentMetric, metric) { + // Map the metric values from the API response. + current := mapMetricValues(resp, currentMetric.Values) + client.ResourceConfigurations.Metrics[i].Values = current + + // Some predefined metricsets configuration do not have a time grain. + // Here is an example: + // https://github.com/elastic/beats/blob/024a9cec6608c6f371ad1cb769649e024124ff92/x-pack/metricbeat/module/azure/database_account/manifest.yml#L11-L13 + // + // Predefined metricsets sometimes have long lists of metrics + // with no time grains. Or users can configure their own + // custom metricsets with no time grain. + // + // In this case, we track the time grain returned by the API. Azure + // provides a default time grain for each metric. + if client.ResourceConfigurations.Metrics[i].TimeGrain == "" { + client.ResourceConfigurations.Metrics[i].TimeGrain = timeGrain } + + result = append(result, client.ResourceConfigurations.Metrics[i]) } } } - return resultedMetrics + + return result } // CreateMetric function will create a client metric based on the resource and metrics configured -func (client *Client) CreateMetric(resourceId string, subResourceId string, namespace string, metrics []string, aggregations string, dimensions []Dimension, timegrain string) Metric { +func (client *Client) CreateMetric(resourceId string, subResourceId string, namespace string, metrics []string, aggregations string, dimensions []Dimension, timeGrain string) Metric { if subResourceId == "" { subResourceId = resourceId } @@ -158,20 +291,21 @@ func (client *Client) CreateMetric(resourceId string, subResourceId string, name Names: metrics, Dimensions: dimensions, Aggregations: aggregations, - TimeGrain: timegrain, + TimeGrain: timeGrain, } + for _, prevMet := range client.ResourceConfigurations.Metrics { if len(prevMet.Values) != 0 && matchMetrics(prevMet, met) { met.Values = prevMet.Values } } + return met } // MapMetricByPrimaryAggregation will map the primary aggregation of the metric definition to the client metric -func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, subResourceId string, namespace string, dim []Dimension, timegrain string) []Metric { - var clientMetrics []Metric - +func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricDefinition, resourceId string, subResourceId string, namespace string, dim []Dimension, timeGrain string) []Metric { + clientMetrics := make([]Metric, 0) metricGroups := make(map[string][]armmonitor.MetricDefinition) for _, met := range metrics { @@ -183,25 +317,27 @@ func (client *Client) MapMetricByPrimaryAggregation(metrics []armmonitor.MetricD for _, metricName := range metricGroup { metricNames = append(metricNames, *metricName.Name.Value) } - clientMetrics = append(clientMetrics, client.CreateMetric(resourceId, subResourceId, namespace, metricNames, key, dim, timegrain)) + clientMetrics = append(clientMetrics, client.CreateMetric(resourceId, subResourceId, namespace, metricNames, key, dim, timeGrain)) } + return clientMetrics } -// GetVMForMetaData func will retrieve the vm details in order to fill in the cloud metadata and also update the client resources -func (client *Client) GetVMForMetaData(resource *Resource, metricValues []MetricValue) VmResource { +// GetVMForMetadata func will retrieve the VM details in order to fill in the cloud metadata +// and also update the client resources +func (client *Client) GetVMForMetadata(resource *Resource, referencePoint KeyValuePoint) VmResource { var ( vm VmResource resourceName = resource.Name resourceId = resource.Id ) - // check first if this is a vm scaleset and the instance name is stored in the dimension value - if dimension, ok := getDimension("VMName", metricValues[0].dimensions); ok { - instanceId := getInstanceId(dimension.Value) + // Search the dimensions for the "VMName" dimension. This dimension is present for VM Scale Sets. + if dimensionValue, ok := getDimension("VMName", referencePoint.Dimensions); ok { + instanceId := getInstanceId(dimensionValue) if instanceId != "" { resourceId += fmt.Sprintf("/virtualMachines/%s", instanceId) - resourceName = dimension.Value + resourceName = dimensionValue } } @@ -254,6 +390,15 @@ func (client *Client) GetResourceForMetaData(grouped Metric) Resource { return Resource{} } +func (client *Client) LookupResource(resourceId string) Resource { + for _, res := range client.Resources { + if res.Id == resourceId { + return res + } + } + return Resource{} +} + // AddVmToResource will add the vm details to the resource func (client *Client) AddVmToResource(resourceId string, vm VmResource) { if len(vm.Id) > 0 && len(vm.Name) > 0 { @@ -272,6 +417,7 @@ func NewMockClient() *Client { AzureMonitorService: azureMockService, Config: Config{}, Log: logp.NewLogger("test azure monitor"), + MetricRegistry: NewMetricRegistry(), } return client } diff --git a/x-pack/metricbeat/module/azure/client_utils.go b/x-pack/metricbeat/module/azure/client_utils.go index d96996c7215b..986125ba6b68 100644 --- a/x-pack/metricbeat/module/azure/client_utils.go +++ b/x-pack/metricbeat/module/azure/client_utils.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" ) @@ -19,7 +21,7 @@ const DefaultTimeGrain = "PT5M" var instanceIdRegex = regexp.MustCompile(`.*?(\d+)$`) // mapMetricValues should map the metric values -func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue, startTime time.Time, endTime time.Time) []MetricValue { +func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue) []MetricValue { var currentMetrics []MetricValue // compare with the previously returned values and filter out any double records for _, v := range metrics { @@ -28,10 +30,10 @@ func mapMetricValues(metrics []armmonitor.Metric, previousMetrics []MetricValue, if metricExists(*v.Name.Value, *mv, previousMetrics) || metricIsEmpty(*mv) { continue } - // remove metric values that are not part of the timeline selected - if mv.TimeStamp.After(startTime) && mv.TimeStamp.Before(endTime) { - continue - } + //// remove metric values that are not part of the timeline selected + //if mv.TimeStamp.After(startTime) && mv.TimeStamp.Before(endTime) { + // continue + //} // define the new metric value and match aggregations values var val MetricValue val.name = *v.Name.Value @@ -133,10 +135,16 @@ func compareMetricValues(metVal *float64, metricVal *float64) bool { return false } -// convertTimegrainToDuration will convert azure timegrain options to actual duration values -func convertTimegrainToDuration(timegrain string) time.Duration { +// convertTimeGrainToDuration converts the Azure time grain options to the equivalent +// `time.Duration` value. +// +// For example, converts "PT1M" to `time.Minute`. +// +// See https://docs.microsoft.com/en-us/azure/azure-monitor/platform/metrics-supported#time-grain +// for more information. +func convertTimeGrainToDuration(timeGrain string) time.Duration { var duration time.Duration - switch timegrain { + switch timeGrain { case "PT1M": duration = time.Minute case "PT5M": @@ -155,11 +163,12 @@ func convertTimegrainToDuration(timegrain string) time.Duration { duration = 24 * time.Hour default: } + return duration } -// groupMetricsByResource is used in order to group metrics by resource and return data faster -func groupMetricsByResource(metrics []Metric) map[string][]Metric { +// groupMetricsDefinitionsByResourceId is used in order to group metrics by resource and return data faster +func groupMetricsDefinitionsByResourceId(metrics []Metric) map[string][]Metric { grouped := make(map[string][]Metric) for _, metric := range metrics { if _, ok := grouped[metric.ResourceId]; !ok { @@ -170,14 +179,17 @@ func groupMetricsByResource(metrics []Metric) map[string][]Metric { return grouped } -// getDimension will check if the dimension value is found in the list -func getDimension(dimension string, dimensions []Dimension) (Dimension, bool) { - for _, dim := range dimensions { - if strings.EqualFold(dim.Name, dimension) { - return dim, true +// getDimension searches for the dimension name in the given dimensions. +func getDimension(dimensionName string, dimensions mapstr.M) (string, bool) { + for name, value := range dimensions { + if strings.EqualFold(name, dimensionName) { + if valueAsString, ok := value.(string); ok { + return valueAsString, true + } } } - return Dimension{}, false + + return "", false } func containsResource(resourceId string, resources []Resource) bool { @@ -189,6 +201,8 @@ func containsResource(resourceId string, resources []Resource) bool { return false } +// getInstanceId returns the instance id from the given dimension value. +// The dimension value is expected to be a string in the format "1234567890". func getInstanceId(dimensionValue string) string { matches := instanceIdRegex.FindStringSubmatch(dimensionValue) if len(matches) == 2 { diff --git a/x-pack/metricbeat/module/azure/client_utils_test.go b/x-pack/metricbeat/module/azure/client_utils_test.go index e3e2de4f96fb..0087fc86aefd 100644 --- a/x-pack/metricbeat/module/azure/client_utils_test.go +++ b/x-pack/metricbeat/module/azure/client_utils_test.go @@ -131,36 +131,6 @@ func TestCompareMetricValues(t *testing.T) { assert.True(t, result) } -func TestGetDimension(t *testing.T) { - dimension := "VMName" - dim1 := "SlotID" - dim2 := "VNU" - dim3 := "VMName" - dimensionList := []Dimension{ - { - Name: dim1, - Value: dim1, - }, - { - Name: dim2, - Value: dim2, - }, - { - Name: dim3, - Value: dim3, - }, - } - result, ok := getDimension(dimension, dimensionList) - assert.True(t, ok) - assert.Equal(t, result.Name, dim3) - assert.Equal(t, result.Value, dim3) - dimension = "VirtualMachine" - result, ok = getDimension(dimension, dimensionList) - assert.False(t, ok) - assert.Equal(t, result.Name, "") - assert.Equal(t, result.Value, "") -} - func TestContainsResource(t *testing.T) { resourceId := "resId" resourceList := []Resource{ diff --git a/x-pack/metricbeat/module/azure/data.go b/x-pack/metricbeat/module/azure/data.go index f728dd0ddc23..b39a99d480da 100644 --- a/x-pack/metricbeat/module/azure/data.go +++ b/x-pack/metricbeat/module/azure/data.go @@ -22,89 +22,313 @@ const ( replaceUpperCaseRegex = `(?:[^A-Z_\W])([A-Z])[^A-Z]` ) -// EventsMapping will map metric values to beats events -func EventsMapping(metrics []Metric, client *Client, report mb.ReporterV2) error { - // metrics and metric values are currently grouped relevant to the azure REST API calls (metrics with the same aggregations per call) - // multiple metrics can be mapped in one event depending on the resource, namespace, dimensions and timestamp +// KeyValuePoint is a key/value point that represents a single metric value +// at a given timestamp. +// +// It also contains the metric dimensions and important metadata (resource ID, +// resource type, etc.). +type KeyValuePoint struct { + Key string + Value interface{} + Namespace string + ResourceId string + ResourceSubId string + Dimensions mapstr.M + TimeGrain string + Timestamp time.Time +} - // grouping metrics by resource and namespace - groupByResourceNamespace := make(map[string][]Metric) +// mapToKeyValuePoints maps a list of `azure.Metric` to a list of `azure.KeyValuePoint`. +// +// `azure.KeyValuePoint` struct makes grouping metrics by timestamp, dimensions, +// and other fields more straightforward than using `azure.Metric`. +// +// How? +// +// `azure.Metric` has the following structure (simplified): +// +// { +// "namespace": "Microsoft.Compute/virtualMachineScaleSets", +// "resource_id": "/subscriptions/123/resourceGroups/ABC/providers/Microsoft.Compute/virtualMachineScaleSets/aks-agentpool-12628255-vmss", +// "time_grain": "PT5M", +// "aggregations": "Total", +// "dimensions": [ +// { +// "name": "VMName", +// "displayName": "*" +// } +// ], +// "names": [ +// "Network In", +// "Network Out" +// ], +// "values": [ +// { +// "name": "Network In", +// "timestamp": "2021-03-04T14:00:00Z", +// "total": 4211652, +// "dimensions": [ +// { +// "name": "VMName", +// "value": "aks-agentpool-12628255-vmss_0" +// } +// ] +// }, +// { +// "name": "Network Out", +// "timestamp": "2021-03-04T14:00:00Z", +// "total": 1105888, +// "dimensions": [ +// { +// "name": "VMName", +// "value": "aks-agentpool-12628255-vmss_0" +// } +// ] +// } +// ] +// } +// +// Here we have two metric values: "Network In" and "Network Out". Each metric value +// has a timestamp, a total, and a list of dimensions. +// +// To group the metrics using the `azure.Metric` structure, we need to assume that +// all metric values have the same timestamp and dimensions. This seems true during +// our tests, but I'm not 100% sure this is always the case. +// +// The alternative is to unpack the metric values into a list of `azure.KeyValuePoint`. +// +// The `mapToKeyValuePoints` function turns the previous `azure.Metric` in the a +// `azure.KeyValuePoint` list with the following structure (simplified): +// +// [ +// +// { +// "key": "network_in_total.total", +// "value": 4211652, +// "namespace": "Microsoft.Compute/virtualMachineScaleSets", +// "resource_id": "/subscriptions/123/resourceGroups/ABC/providers/Microsoft.Compute/virtualMachineScaleSets/aks-agentpool-12628255-vmss", +// "time_grain": "PT5M", +// "dimensions": { +// "VMName": "aks-agentpool-12628255-vmss_0" +// }, +// "time": "2021-03-04T14:00:00Z" +// }, +// { +// "key": "network_out_total.total", +// "value": 1105888, +// "namespace": "Microsoft.Compute/virtualMachineScaleSets", +// "resource_id": "/subscriptions/123/resourceGroups/ABC/providers/Microsoft.Compute/virtualMachineScaleSets/aks-agentpool-12628255-vmss", +// "time_grain": "PT5M", +// "dimensions": { +// "VMName": "aks-agentpool-12628255-vmss_0" +// }, +// "time": "2021-03-04T14:00:00Z" +// } +// +// ] +// +// With this structure, we can group the metrics by timestamp, dimensions, and +// other fields without making assumptions. +func mapToKeyValuePoints(metrics []Metric) []KeyValuePoint { + var points []KeyValuePoint for _, metric := range metrics { - // check if any values are returned - if len(metric.Values) == 0 { - continue - } - // build a resource key with unique resource namespace combination - resNamkey := fmt.Sprintf("%s,%s", metric.ResourceId, metric.Namespace) - groupByResourceNamespace[resNamkey] = append(groupByResourceNamespace[resNamkey], metric) - } - // grouping metrics by the dimensions configured - groupByDimensions := make(map[string][]Metric) - for resNamKey, resourceMetrics := range groupByResourceNamespace { - for _, resourceMetric := range resourceMetrics { - if len(resourceMetric.Dimensions) == 0 { - groupByDimensions[resNamKey+NoDimension] = append(groupByDimensions[resNamKey+NoDimension], resourceMetric) - } else { - var dimKey string - for _, dim := range resourceMetric.Dimensions { - dimKey += dim.Name + dim.Value + for _, value := range metric.Values { + point := KeyValuePoint{ + Timestamp: value.timestamp, + Dimensions: mapstr.M{}, + } + + metricName := managePropertyName(value.name) + switch { + case value.min != nil: + point.Key = fmt.Sprintf("%s.%s", metricName, "min") + point.Value = value.avg + case value.max != nil: + point.Key = fmt.Sprintf("%s.%s", metricName, "max") + point.Value = value.avg + case value.avg != nil: + point.Key = fmt.Sprintf("%s.%s", metricName, "avg") + point.Value = value.avg + case value.total != nil: + point.Key = fmt.Sprintf("%s.%s", metricName, "total") + point.Value = value.total + case value.count != nil: + point.Key = fmt.Sprintf("%s.%s", metricName, "count") + point.Value = value.count + } + + point.Namespace = metric.Namespace + point.ResourceId = metric.ResourceId + point.ResourceSubId = metric.ResourceSubId + point.TimeGrain = metric.TimeGrain + + // The number of dimensions in the metric definition and the + // number of dimensions in the metric values should be the same. + // + // But, since definitions and values are retrieved from different + // API endpoints, we need to make sure that we don't panic if the + // number of dimensions is different. + if len(metric.Dimensions) == len(value.dimensions) { + // Take the dimension name from the metric definition and the + // dimension value from the metric value. + // + // Why? + // + // Because the dimension name in the metric value + // comes in lower case. + for _, dim := range metric.Dimensions { + // Dimensions from metric definition and metric value are + // not guaranteed to be in the same order, so we need to + // find by name the right value for each dimension. + _, _ = point.Dimensions.Put(dim.Name, getDimensionValue(dim.Name, value.dimensions)) } - groupByDimensions[resNamKey+dimKey] = append(groupByDimensions[resNamKey+dimKey], resourceMetric) } + points = append(points, point) } } - // grouping metric values by timestamp and creating events (for each metric the REST api can retrieve multiple metric values for same aggregation but different timeframes) - for _, grouped := range groupByDimensions { - defaultMetric := grouped[0] - resource := client.GetResourceForMetaData(defaultMetric) - groupByTimeMetrics := make(map[time.Time][]MetricValue) - for _, metric := range grouped { - for _, m := range metric.Values { - groupByTimeMetrics[m.timestamp] = append(groupByTimeMetrics[m.timestamp], m) - } + return points +} + +// mapToEvents maps the metric values to events and reports them to Elasticsearch. +func mapToEvents(metrics []Metric, client *Client, reporter mb.ReporterV2) error { + // Map the metric values into a list of key/value points. + // + // This makes it easier to group the metrics by timestamp + // and dimensions. + points := mapToKeyValuePoints(metrics) + + // Group the points by timestamp and other fields we consider + // as dimensions for the whole event. + // + // Metrics have their own dimensions, and this is fine at the + // metric level. + // + // We identified a set of field we consider as dimensions + // at the event level. The event level dimensions define + // the time series when TSDB is enabled. + groupedPoints := make(map[string][]KeyValuePoint) + for _, point := range points { + groupingKey := fmt.Sprintf( + "%s,%s,%s,%s,%s,%s", + point.Timestamp, + point.Namespace, + point.ResourceId, + point.ResourceSubId, + point.Dimensions, + point.TimeGrain, + ) + + groupedPoints[groupingKey] = append(groupedPoints[groupingKey], point) + } + + // Create an event for each group of points and send + // it to Elasticsearch. + for _, _points := range groupedPoints { + if len(_points) == 0 { + // This should never happen, but I don't feel like + // writing points[0] without checking the length first. + continue } - for timestamp, groupTimeValues := range groupByTimeMetrics { - var event mb.Event - var metricList mapstr.M - var vm VmResource - // group events by dimension values - exists, validDimensions := returnAllDimensions(defaultMetric.Dimensions) - if exists { - for _, selectedDimension := range validDimensions { - groupByDimensions := make(map[string][]MetricValue) - for _, dimGroupValue := range groupTimeValues { - dimKey := fmt.Sprintf("%s,%s", selectedDimension.Name, getDimensionValue(selectedDimension.Name, dimGroupValue.dimensions)) - groupByDimensions[dimKey] = append(groupByDimensions[dimKey], dimGroupValue) - } - for _, groupDimValues := range groupByDimensions { - manageAndReportEvent(client, report, event, metricList, vm, timestamp, defaultMetric, resource, groupDimValues) - } - } - } else { - manageAndReportEvent(client, report, event, metricList, vm, timestamp, defaultMetric, resource, groupTimeValues) - } + + // We assume that all points have the same timestamp and + // dimensions because they were grouped by the same key. + // + // We use the reference point to get the resource ID and + // all other information common to all points. + referencePoint := _points[0] + + // Look up the full cloud resource information in the cache. + resource := client.LookupResource(referencePoint.ResourceId) + + // Build the event using all the information we have. + event, err := buildEventFrom(referencePoint, _points, resource, client.Config.DefaultResourceType) + if err != nil { + return err } + + // + // Enrich the event with cloud metadata. + // + if client.Config.AddCloudMetadata { + vm := client.GetVMForMetadata(&resource, referencePoint) + addCloudVMMetadata(&event, vm, resource.Subscription) + } + + // + // Report the event to Elasticsearch. + // + reporter.Event(event) } + return nil } -// manageAndReportEvent function will handle event creation and report -func manageAndReportEvent(client *Client, report mb.ReporterV2, event mb.Event, metricList mapstr.M, vm VmResource, timestamp time.Time, defaultMetric Metric, resource Resource, groupedValues []MetricValue) { - event, metricList = createEvent(timestamp, defaultMetric, resource, groupedValues) - if client.Config.AddCloudMetadata { - vm = client.GetVMForMetaData(&resource, groupedValues) - addCloudVMMetadata(&event, vm, resource.Subscription) +// buildEventFrom build an event from a group of points. +func buildEventFrom(referencePoint KeyValuePoint, points []KeyValuePoint, resource Resource, defaultResourceType string) (mb.Event, error) { + event := mb.Event{ + ModuleFields: mapstr.M{ + "timegrain": referencePoint.TimeGrain, + "namespace": referencePoint.Namespace, + "resource": mapstr.M{ + "type": resource.Type, + "group": resource.Group, + "name": resource.Name, + }, + "subscription_id": resource.Subscription, + }, + MetricSetFields: mapstr.M{}, + Timestamp: referencePoint.Timestamp, + RootFields: mapstr.M{ + "cloud": mapstr.M{ + "provider": "azure", + "region": resource.Location, + }, + }, } - if client.Config.DefaultResourceType == "" { - event.ModuleFields.Put("metrics", metricList) + + if referencePoint.ResourceSubId != "" { + _, _ = event.ModuleFields.Put("resource.id", referencePoint.ResourceSubId) + } else { + _, _ = event.ModuleFields.Put("resource.id", resource.Id) + } + if len(resource.Tags) > 0 { + _, _ = event.ModuleFields.Put("resource.tags", resource.Tags) + } + + if len(referencePoint.Dimensions) > 0 { + for key, value := range referencePoint.Dimensions { + if value == "*" { + _, _ = event.ModuleFields.Put(fmt.Sprintf("dimensions.%s", managePropertyName(key)), getDimensionValueForKeyValuePoint(key, referencePoint.Dimensions)) + } else { + _, _ = event.ModuleFields.Put(fmt.Sprintf("dimensions.%s", managePropertyName(key)), value) + } + } + } + + metricList := mapstr.M{} + for _, point := range points { + _, _ = metricList.Put(point.Key, point.Value) + } + + // I don't know why we are doing it, but we need to keep it + // for now for backwards compatibility. + // + // There are Metricbeat modules and Elastic Agent integrations + // that rely on this. + if defaultResourceType == "" { + _, _ = event.ModuleFields.Put("metrics", metricList) } else { for key, metric := range metricList { - event.MetricSetFields.Put(key, metric) + _, _ = event.MetricSetFields.Put(key, metric) } } - report.Event(event) + + // Enrich the event with host metadata. + addHostMetadata(&event, metricList) + + return event, nil } // managePropertyName function will handle metric names, there are several formats the metric names are written @@ -152,96 +376,24 @@ func ReplaceUpperCase(src string) string { }) } -// createEvent will create a new base event -func createEvent(timestamp time.Time, metric Metric, resource Resource, metricValues []MetricValue) (mb.Event, mapstr.M) { - - event := mb.Event{ - ModuleFields: mapstr.M{ - "timegrain": metric.TimeGrain, - "namespace": metric.Namespace, - "resource": mapstr.M{ - "type": resource.Type, - "group": resource.Group, - "name": resource.Name, - }, - "subscription_id": resource.Subscription, - }, - MetricSetFields: mapstr.M{}, - Timestamp: timestamp, - RootFields: mapstr.M{ - "cloud": mapstr.M{ - "provider": "azure", - "region": resource.Location, - }, - }, - } - if metric.ResourceSubId != "" { - event.ModuleFields.Put("resource.id", metric.ResourceSubId) - } else { - event.ModuleFields.Put("resource.id", resource.Id) - } - if len(resource.Tags) > 0 { - event.ModuleFields.Put("resource.tags", resource.Tags) - } - - if len(metric.Dimensions) > 0 { - for _, dimension := range metric.Dimensions { - if dimension.Value == "*" { - event.ModuleFields.Put(fmt.Sprintf("dimensions.%s", managePropertyName(dimension.Name)), getDimensionValue(dimension.Name, metricValues[0].dimensions)) - } else { - event.ModuleFields.Put(fmt.Sprintf("dimensions.%s", managePropertyName(dimension.Name)), dimension.Value) - } - - } - } - - metricList := mapstr.M{} - for _, value := range metricValues { - metricNameString := fmt.Sprintf("%s", managePropertyName(value.name)) - if value.min != nil { - metricList.Put(fmt.Sprintf("%s.%s", metricNameString, "min"), *value.min) - } - if value.max != nil { - metricList.Put(fmt.Sprintf("%s.%s", metricNameString, "max"), *value.max) - } - if value.avg != nil { - metricList.Put(fmt.Sprintf("%s.%s", metricNameString, "avg"), *value.avg) - } - if value.total != nil { - metricList.Put(fmt.Sprintf("%s.%s", metricNameString, "total"), *value.total) - } - if value.count != nil { - metricList.Put(fmt.Sprintf("%s.%s", metricNameString, "count"), *value.count) - } - } - addHostMetadata(&event, metricList) - - return event, metricList -} - // getDimensionValue will return dimension value for the key provided func getDimensionValue(dimension string, dimensions []Dimension) string { for _, dim := range dimensions { - if strings.ToLower(dim.Name) == strings.ToLower(dimension) { + if strings.EqualFold(dim.Name, dimension) { return dim.Value } } + return "" } -// returnAllDimensions will check if users has entered a filter for all dimension values (*) -func returnAllDimensions(dimensions []Dimension) (bool, []Dimension) { - if len(dimensions) == 0 { - return false, nil - } - var dims []Dimension - for _, dim := range dimensions { - if dim.Value == "*" { - dims = append(dims, dim) +// getDimensionValue2 will return dimension value for the key provided +func getDimensionValueForKeyValuePoint(dimension string, dimensions mapstr.M) string { + for key, value := range dimensions { + if strings.EqualFold(key, dimension) { + return fmt.Sprintf("%v", value) } } - if len(dims) == 0 { - return false, nil - } - return true, dims + + return "" } diff --git a/x-pack/metricbeat/module/azure/data_test.go b/x-pack/metricbeat/module/azure/data_test.go index 409980b6459f..8ea3144a0a71 100644 --- a/x-pack/metricbeat/module/azure/data_test.go +++ b/x-pack/metricbeat/module/azure/data_test.go @@ -6,31 +6,10 @@ package azure import ( "testing" - "time" "github.com/stretchr/testify/assert" - - "github.com/elastic/elastic-agent-libs/mapstr" ) -func TestReturnAllDimensions(t *testing.T) { - dimensionList := []Dimension{ - { - Value: "vm1", - Name: "VMName", - }, - { - Value: "*", - Name: "SlotID", - }, - } - result, dims := returnAllDimensions(dimensionList) - assert.True(t, result) - assert.Equal(t, len(dims), 1) - assert.Equal(t, dims[0].Name, "SlotID") - assert.Equal(t, dims[0].Value, "*") -} - func TestGetDimensionValue(t *testing.T) { dimensionList := []Dimension{ { @@ -67,60 +46,3 @@ func TestManagePropertyName(t *testing.T) { result = managePropertyName("Percentage CPU") assert.Equal(t, result, "percentage_cpu") } - -func TestCreateEvent(t *testing.T) { - createTime, err := time.Parse(time.RFC3339, "2020-02-28T20:53:03Z") - if !assert.NoError(t, err) { - t.Fatal(err) - } - resource := Resource{ - Id: "resId", - Name: "res", - Location: "west_europe", - Type: "resType", - Group: "resGroup", - Tags: nil, - Subscription: "subId", - } - metric := Metric{ - ResourceId: "resId", - Namespace: "namespace1", - Names: []string{"Percentage CPU"}, - Aggregations: "", - Dimensions: nil, - Values: nil, - TimeGrain: "", - } - var total float64 = 23 - metricValues := []MetricValue{ - { - name: "Percentage CPU", - avg: nil, - min: nil, - max: nil, - total: &total, - count: nil, - timestamp: time.Time{}, - dimensions: nil, - }, - } - event, list := createEvent(createTime, metric, resource, metricValues) - assert.NotNil(t, event) - assert.NotNil(t, list) - assert.Equal(t, event.Timestamp, createTime) - sub, err := event.ModuleFields.GetValue("subscription_id") - if !assert.NoError(t, err) { - t.Fatal(err) - } - assert.Equal(t, sub, resource.Subscription) - namespace, err := event.ModuleFields.GetValue("namespace") - if !assert.NoError(t, err) { - t.Fatal(err) - } - assert.Equal(t, namespace, metric.Namespace) - val, err := list.GetValue("percentage_cpu") - if !assert.NoError(t, err) { - t.Fatal(err) - } - assert.Equal(t, val.(mapstr.M), mapstr.M{"total": total}) -} diff --git a/x-pack/metricbeat/module/azure/monitor/client_helper.go b/x-pack/metricbeat/module/azure/monitor/client_helper.go index 61254685bb7c..9d69f67f6876 100644 --- a/x-pack/metricbeat/module/azure/monitor/client_helper.go +++ b/x-pack/metricbeat/module/azure/monitor/client_helper.go @@ -27,6 +27,7 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE if err != nil { return nil, fmt.Errorf("no metric definitions were found for resource %s and namespace %s %w", *resource.ID, metric.Namespace, err) } + if len(metricDefinitions.Value) == 0 { if metric.IgnoreUnsupported { client.Log.Infof(missingNamespace, *resource.ID, metric.Namespace) @@ -63,6 +64,7 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE } } } + return metrics, nil } @@ -70,7 +72,8 @@ func mapMetrics(client *azure.Client, resources []*armresources.GenericResourceE func filterMetricNames(resourceId string, metricConfig azure.MetricConfig, metricDefinitions []*armmonitor.MetricDefinition) ([]string, error) { var supportedMetricNames []string var unsupportedMetricNames []string - // check if all metric names are selected (*) + // If users selected the wildcard option (*), we add + // all the metric definitions to the supported metric. if strings.Contains(strings.Join(metricConfig.Name, " "), "*") { for _, definition := range metricDefinitions { supportedMetricNames = append(supportedMetricNames, *definition.Name.Value)