-
Notifications
You must be signed in to change notification settings - Fork 236
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a prometheus label mapping component
- Loading branch information
Showing
5 changed files
with
451 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
139 changes: 139 additions & 0 deletions
139
docs/sources/reference/components/prometheus/prometheus.mapping.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
--- | ||
canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.mapping/ | ||
aliases: | ||
- ../prometheus.mapping/ # /docs/alloy/latest/reference/components/prometheus.mapping/ | ||
description: Learn about prometheus.mapping | ||
title: prometheus.mapping | ||
--- | ||
|
||
# prometheus.mapping | ||
|
||
Prometheus metrics follow the [OpenMetrics](https://openmetrics.io/) format. | ||
Each time series is uniquely identified by its metric name, plus optional | ||
key-value pairs called labels. Each sample represents a datapoint in the | ||
time series and contains a value and an optional timestamp. | ||
``` | ||
<metric name>{<label_1>=<label_val_1>, <label_2>=<label_val_2> ...} <value> [timestamp] | ||
``` | ||
|
||
The `prometheus.mapping` component create a new label on each metric passed | ||
along to the exported receiver by applying a mapping table to a label value. | ||
|
||
The most common use of `prometheus.mapping` is to create a new label based filter Prometheus metrics or | ||
standardize the label set that is passed to one or more downstream | ||
receivers. The `rule` blocks are applied to the label set of each metric in | ||
order of their appearance in the configuration file. The configured rules can | ||
be retrieved by calling the function in the `rules` export field. | ||
|
||
Multiple `prometheus.mapping` components can be specified by giving them | ||
different labels. | ||
|
||
## Usage | ||
|
||
```alloy | ||
prometheus.mapping "LABEL" { | ||
forward_to = RECEIVER_LIST | ||
source_label = "labelA" | ||
target_label = "labelB" | ||
mapping = { | ||
"from" = "to", | ||
... | ||
} | ||
} | ||
``` | ||
|
||
## Arguments | ||
|
||
The following arguments are supported: | ||
|
||
Name | Type | Description | Default | Required | ||
---- | ---- | ----------- | ------- | -------- | ||
`forward_to` | `list(MetricsReceiver)` | Where the metrics should be forwarded to, after relabeling takes place. | | yes | ||
`source_label` | `string` | Name of the source label to use for mapping. | | yes | ||
`target_label` | `string` | Name of the target label to use for mapping. | | yes | ||
`mapping` | `map(string,string)` | Mapping from source label value to target labek value. | | yes | ||
|
||
## Exported fields | ||
|
||
The following fields are exported and can be referenced by other components: | ||
|
||
Name | Type | Description | ||
---- | ---- | ----------- | ||
`receiver` | `MetricsReceiver` | The input receiver where samples are sent to be relabeled. | ||
|
||
## Component health | ||
|
||
`prometheus.mapping` is only reported as unhealthy if given an invalid | ||
configuration. In those cases, exported fields are kept at their last healthy | ||
values. | ||
|
||
## Debug information | ||
|
||
`prometheus.mapping` does not expose any component-specific debug information. | ||
|
||
## Debug metrics | ||
|
||
* `prometheus_mapping_metrics_processed` (counter): Total number of metrics processed. | ||
* `prometheus_mapping_metrics_written` (counter): Total number of metrics written. | ||
|
||
## Example | ||
|
||
Let's create an instance of a see `prometheus.mapping` component and see how | ||
it acts on the following metrics. | ||
|
||
```alloy | ||
prometheus.mapping "keep_backend_only" { | ||
forward_to = [prometheus.remote_write.onprem.receiver] | ||
source_label = "app" | ||
target_label = "team" | ||
mapping = { | ||
"frontend" = "teamA" | ||
"backend" = "teamB" | ||
"database" = "teamC" | ||
} | ||
} | ||
``` | ||
|
||
``` | ||
metric_a{__address__ = "localhost", instance = "development", app = "frontend"} 10 | ||
metric_a{__address__ = "localhost", instance = "development", app = "backend"} 2 | ||
metric_a{__address__ = "cluster_a", instance = "production", app = "frontend"} 7 | ||
metric_a{__address__ = "cluster_a", instance = "production", app = "backend"} 9 | ||
metric_a{__address__ = "cluster_b", instance = "production", app = "database"} 4 | ||
``` | ||
|
||
After applying the mapping a new label `team` is created based on mapping table | ||
and `app` label value. | ||
|
||
``` | ||
metric_a{team = "teamA", __address__ = "localhost", instance = "development", app = "frontend"} 10 | ||
metric_a{team = "teamB", __address__ = "localhost", instance = "development", app = "backend"} 2 | ||
metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "frontend"} 7 | ||
metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "backend"} 9 | ||
metric_a{host = "teamC", __address__ = "cluster_a", instance = "production", app = "database"} 4 | ||
``` | ||
|
||
The resulting metrics are then propagated to each receiver defined in the | ||
`forward_to` argument. | ||
<!-- START GENERATED COMPATIBLE COMPONENTS --> | ||
|
||
## Compatible components | ||
|
||
`prometheus.mapping` can accept arguments from the following components: | ||
|
||
- Components that export [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-exporters) | ||
|
||
`prometheus.mapping` has exports that can be consumed by the following components: | ||
|
||
- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers) | ||
|
||
{{< admonition type="note" >}} | ||
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. | ||
Refer to the linked documentation for more details. | ||
{{< /admonition >}} | ||
|
||
<!-- END GENERATED COMPATIBLE COMPONENTS --> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
package mapping | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/grafana/alloy/internal/component" | ||
"github.com/grafana/alloy/internal/component/prometheus" | ||
"github.com/grafana/alloy/internal/featuregate" | ||
"github.com/grafana/alloy/internal/service/labelstore" | ||
"github.com/grafana/alloy/internal/service/livedebugging" | ||
prometheus_client "github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/model/exemplar" | ||
"github.com/prometheus/prometheus/model/histogram" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/model/metadata" | ||
"github.com/prometheus/prometheus/storage" | ||
"go.uber.org/atomic" | ||
) | ||
|
||
const name = "prometheus.mapping" | ||
|
||
func init() { | ||
component.Register(component.Registration{ | ||
Name: name, | ||
Stability: featuregate.StabilityGenerallyAvailable, | ||
Args: Arguments{}, | ||
Exports: Exports{}, | ||
|
||
Build: func(opts component.Options, args component.Arguments) (component.Component, error) { | ||
return New(opts, args.(Arguments)) | ||
}, | ||
}) | ||
} | ||
|
||
// Arguments holds values which are used to configure the prometheus.relabel | ||
// component. | ||
type Arguments struct { | ||
// Where the relabelled metrics should be forwarded to. | ||
ForwardTo []storage.Appendable `alloy:"forward_to,attr"` | ||
|
||
// Labels to use for mapping | ||
SourceLabel string `alloy:"source_label,attr"` | ||
TargetLabel string `alloy:"target_label,attr"` | ||
|
||
// Mapping | ||
LabelValuesMapping map[string]string `alloy:"mapping,attr"` | ||
} | ||
|
||
// SetToDefault implements syntax.Defaulter. | ||
func (arg *Arguments) SetToDefault() { | ||
} | ||
|
||
// Validate implements syntax.Validator. | ||
func (arg *Arguments) Validate() error { | ||
return nil | ||
} | ||
|
||
// Exports holds values which are exported by the prometheus.relabel component. | ||
type Exports struct { | ||
Receiver storage.Appendable `alloy:"receiver,attr"` | ||
} | ||
|
||
// Component implements the prometheus.mapping component. | ||
type Component struct { | ||
sourceLabel string | ||
targetLabel string | ||
|
||
mappings map[string]string | ||
mut sync.RWMutex | ||
opts component.Options | ||
receiver *prometheus.Interceptor | ||
metricsProcessed prometheus_client.Counter | ||
metricsOutgoing prometheus_client.Counter | ||
fanout *prometheus.Fanout | ||
exited atomic.Bool | ||
ls labelstore.LabelStore | ||
|
||
debugDataPublisher livedebugging.DebugDataPublisher | ||
} | ||
|
||
var ( | ||
_ component.Component = (*Component)(nil) | ||
_ component.LiveDebugging = (*Component)(nil) | ||
) | ||
|
||
// New creates a new prometheus.relabel component. | ||
func New(o component.Options, args Arguments) (*Component, error) { | ||
debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
data, err := o.GetServiceData(labelstore.ServiceName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
c := &Component{ | ||
opts: o, | ||
ls: data.(labelstore.LabelStore), | ||
sourceLabel: args.SourceLabel, | ||
targetLabel: args.TargetLabel, | ||
mappings: args.LabelValuesMapping, | ||
debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), | ||
} | ||
c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ | ||
Name: "alloy_prometheus_mapping_metrics_processed", | ||
Help: "Total number of metrics processed", | ||
}) | ||
c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{ | ||
Name: "alloy_prometheus_mapping_metrics_written", | ||
Help: "Total number of metrics written", | ||
}) | ||
|
||
for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing} { | ||
err = o.Registerer.Register(metric) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls) | ||
c.receiver = prometheus.NewInterceptor( | ||
c.fanout, | ||
c.ls, | ||
prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { | ||
if c.exited.Load() { | ||
return 0, fmt.Errorf("%s has exited", o.ID) | ||
} | ||
|
||
newLbl := c.mapping(v, l) | ||
if newLbl.IsEmpty() { | ||
return 0, nil | ||
} | ||
c.metricsOutgoing.Inc() | ||
return next.Append(0, newLbl, t, v) | ||
}), | ||
prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { | ||
if c.exited.Load() { | ||
return 0, fmt.Errorf("%s has exited", o.ID) | ||
} | ||
|
||
newLbl := c.mapping(0, l) | ||
if newLbl.IsEmpty() { | ||
return 0, nil | ||
} | ||
return next.AppendExemplar(0, newLbl, e) | ||
}), | ||
prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { | ||
if c.exited.Load() { | ||
return 0, fmt.Errorf("%s has exited", o.ID) | ||
} | ||
|
||
newLbl := c.mapping(0, l) | ||
if newLbl.IsEmpty() { | ||
return 0, nil | ||
} | ||
return next.UpdateMetadata(0, newLbl, m) | ||
}), | ||
prometheus.WithHistogramHook(func(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { | ||
if c.exited.Load() { | ||
return 0, fmt.Errorf("%s has exited", o.ID) | ||
} | ||
|
||
newLbl := c.mapping(0, l) | ||
if newLbl.IsEmpty() { | ||
return 0, nil | ||
} | ||
return next.AppendHistogram(0, newLbl, t, h, fh) | ||
}), | ||
) | ||
|
||
// Immediately export the receiver which remains the same for the component | ||
// lifetime. | ||
o.OnStateChange(Exports{Receiver: c.receiver}) | ||
|
||
// Call to Update() to set the relabelling rules once at the start. | ||
if err = c.Update(args); err != nil { | ||
return nil, err | ||
} | ||
|
||
return c, nil | ||
} | ||
|
||
// Run implements component.Component. | ||
func (c *Component) Run(ctx context.Context) error { | ||
defer c.exited.Store(true) | ||
|
||
<-ctx.Done() | ||
return nil | ||
} | ||
|
||
// Update implements component.Component. | ||
func (c *Component) Update(args component.Arguments) error { | ||
c.mut.Lock() | ||
defer c.mut.Unlock() | ||
|
||
newArgs := args.(Arguments) | ||
c.sourceLabel = newArgs.SourceLabel | ||
c.targetLabel = newArgs.TargetLabel | ||
c.mappings = newArgs.LabelValuesMapping | ||
c.fanout.UpdateChildren(newArgs.ForwardTo) | ||
|
||
c.opts.OnStateChange(Exports{Receiver: c.receiver}) | ||
|
||
return nil | ||
} | ||
|
||
func (c *Component) mapping(val float64, lbls labels.Labels) labels.Labels { | ||
// Relabel against a copy of the labels to prevent modifying the original | ||
// slice. | ||
lb := labels.NewBuilder(lbls.Copy()) | ||
sourceValue := lb.Get(c.sourceLabel) | ||
targetValue := c.mappings[sourceValue] | ||
lb.Set(c.targetLabel, targetValue) | ||
newLabels := lb.Labels() | ||
|
||
componentID := livedebugging.ComponentID(c.opts.ID) | ||
if c.debugDataPublisher.IsActive(componentID) { | ||
c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), newLabels.String())) | ||
} | ||
|
||
return newLabels | ||
} | ||
|
||
func (c *Component) LiveDebugging(_ int) {} |
Oops, something went wrong.