diff --git a/CHANGELOG.md b/CHANGELOG.md index 28fc106bc5..7bf2eda88c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,8 @@ v1.5.0 - (_Experimental_) Add the `array.combine_maps` function to the stdlib. (@ptodev, @wildum) +- Add a `prometheus.mapping` component to add a label based on a source_label and a mapping table. (@vaxvms) + ### Enhancements - The `mimir.rules.kubernetes` component now supports adding extra label matchers diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index d55681be97..87251c0046 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -173,6 +173,7 @@ The following components, grouped by namespace, _export_ Prometheus `MetricsRece {{< /collapse >}} {{< collapse title="prometheus" >}} +- [prometheus.mapping](../components/prometheus/prometheus.mapping) - [prometheus.relabel](../components/prometheus/prometheus.relabel) - [prometheus.remote_write](../components/prometheus/prometheus.remote_write) - [prometheus.write.queue](../components/prometheus/prometheus.write.queue) @@ -192,6 +193,7 @@ The following components, grouped by namespace, _consume_ Prometheus `MetricsRec {{< /collapse >}} {{< collapse title="prometheus" >}} +- [prometheus.mapping](../components/prometheus/prometheus.mapping) - [prometheus.operator.podmonitors](../components/prometheus/prometheus.operator.podmonitors) - [prometheus.operator.probes](../components/prometheus/prometheus.operator.probes) - [prometheus.operator.servicemonitors](../components/prometheus/prometheus.operator.servicemonitors) diff --git a/docs/sources/reference/components/prometheus/prometheus.mapping.md b/docs/sources/reference/components/prometheus/prometheus.mapping.md new file mode 100644 index 0000000000..744958b1ff --- /dev/null +++ b/docs/sources/reference/components/prometheus/prometheus.mapping.md @@ -0,0 +1,136 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.mapping/ +description: Learn about prometheus.mapping +title: prometheus.mapping +--- + +# prometheus.mapping + +Prometheus metrics follow the [OpenMetrics](https://openmetrics.io/) format. +Each time series is uniquely identified by its metric name, plus optional +key-value pairs called labels. Each sample represents a datapoint in the +time series and contains a value and an optional timestamp. + +```text +{=, = ...} [timestamp] +``` + +The `prometheus.mapping` component create a new label on each metric passed +along to the exported receiver by applying a mapping table to a label value. + +The most common use of `prometheus.mapping` is to create a new label-based filter +Prometheus metrics or standardize the label set that is passed to one or more +downstream receivers. + +You can specify multiple `prometheus.mapping` components by givin them different labels. +different labels. + +## Usage + +```alloy +prometheus.mapping "LABEL" { + forward_to = RECEIVER_LIST + + source_label = "labelA" + target_label = "labelB" + + mapping = { + "from" = "to", + ... + } +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---------------|-------------------------|-------------------------------------------------------------------------|---------|--------- +`forward_to` | `list(MetricsReceiver)` | Where the metrics should be forwarded to, after relabeling takes place. | | yes +`source_label` | `string` | Name of the source label to use for mapping. | | yes +`target_label` | `string` | Name of the target label to use for mapping. | | yes +`mapping` | `map(string,string)` | Mapping from source label value to target label value. | | yes + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +-----------|-------------------|----------------------------------------------------------- +`receiver` | `MetricsReceiver` | The input receiver where samples are sent to be relabeled. + +## Component health + +`prometheus.mapping` is only reported as unhealthy if given an invalid +configuration. In those cases, exported fields are kept at their last healthy +values. + +## Debug information + +`prometheus.mapping` doesn't expose any component-specific debug information. + +## Debug metrics + +* `prometheus_mapping_metrics_processed` (counter): Total number of metrics processed. +* `prometheus_mapping_metrics_written` (counter): Total number of metrics written. + +## Example + +You can create and an instance of a see `prometheus.mapping` component and see how +it acts on the following metrics. + +```alloy +prometheus.mapping "keep_backend_only" { + forward_to = [prometheus.remote_write.onprem.receiver] + + source_label = "app" + target_label = "team" + + mapping = { + "frontend" = "teamA" + "backend" = "teamB" + "database" = "teamC" + } +} +``` + +```text +metric_a{__address__ = "localhost", instance = "development", app = "frontend"} 10 +metric_a{__address__ = "localhost", instance = "development", app = "backend"} 2 +metric_a{__address__ = "cluster_a", instance = "production", app = "frontend"} 7 +metric_a{__address__ = "cluster_a", instance = "production", app = "backend"} 9 +metric_a{__address__ = "cluster_b", instance = "production", app = "database"} 4 +``` + +After applying the mapping a new `team` label is created based on mapping table +and `app` label value. + +```text +metric_a{team = "teamA", __address__ = "localhost", instance = "development", app = "frontend"} 10 +metric_a{team = "teamB", __address__ = "localhost", instance = "development", app = "backend"} 2 +metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "frontend"} 7 +metric_a{host = "teamA", __address__ = "cluster_a", instance = "production", app = "backend"} 9 +metric_a{host = "teamC", __address__ = "cluster_a", instance = "production", app = "database"} 4 +``` + +The resulting metrics are then propagated to each receiver defined in the +`forward_to` argument. + + +## Compatible components + +`prometheus.mapping` can accept arguments from the following components: + +- Components that export [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-exporters) + +`prometheus.mapping` has exports that can be consumed by the following components: + +- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 4f58d9fbe6..5f2cba1ea2 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -130,6 +130,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/exporter/statsd" // Import prometheus.exporter.statsd _ "github.com/grafana/alloy/internal/component/prometheus/exporter/unix" // Import prometheus.exporter.unix _ "github.com/grafana/alloy/internal/component/prometheus/exporter/windows" // Import prometheus.exporter.windows + _ "github.com/grafana/alloy/internal/component/prometheus/mapping" // Import prometheus.mapping _ "github.com/grafana/alloy/internal/component/prometheus/operator/podmonitors" // Import prometheus.operator.podmonitors _ "github.com/grafana/alloy/internal/component/prometheus/operator/probes" // Import prometheus.operator.probes _ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors diff --git a/internal/component/prometheus/mapping/mapping.go b/internal/component/prometheus/mapping/mapping.go new file mode 100644 index 0000000000..5c3c5431fa --- /dev/null +++ b/internal/component/prometheus/mapping/mapping.go @@ -0,0 +1,227 @@ +package mapping + +import ( + "context" + "fmt" + "sync" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + prometheus_client "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" +) + +const name = "prometheus.mapping" + +func init() { + component.Register(component.Registration{ + Name: name, + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + Exports: Exports{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return New(opts, args.(Arguments)) + }, + }) +} + +// Arguments holds values which are used to configure the prometheus.relabel +// component. +type Arguments struct { + // Where the relabelled metrics should be forwarded to. + ForwardTo []storage.Appendable `alloy:"forward_to,attr"` + + // Labels to use for mapping + SourceLabel string `alloy:"source_label,attr"` + TargetLabel string `alloy:"target_label,attr"` + + // Mapping + LabelValuesMapping map[string]string `alloy:"mapping,attr"` +} + +// SetToDefault implements syntax.Defaulter. +func (arg *Arguments) SetToDefault() { +} + +// Validate implements syntax.Validator. +func (arg *Arguments) Validate() error { + return nil +} + +// Exports holds values which are exported by the prometheus.relabel component. +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +// Component implements the prometheus.mapping component. +type Component struct { + sourceLabel string + targetLabel string + + mappings map[string]string + mut sync.RWMutex + opts component.Options + receiver *prometheus.Interceptor + metricsProcessed prometheus_client.Counter + metricsOutgoing prometheus_client.Counter + fanout *prometheus.Fanout + exited atomic.Bool + ls labelstore.LabelStore + + debugDataPublisher livedebugging.DebugDataPublisher +} + +var ( + _ component.Component = (*Component)(nil) + _ component.LiveDebugging = (*Component)(nil) +) + +// New creates a new prometheus.relabel component. +func New(o component.Options, args Arguments) (*Component, error) { + debugDataPublisher, err := o.GetServiceData(livedebugging.ServiceName) + if err != nil { + return nil, err + } + + data, err := o.GetServiceData(labelstore.ServiceName) + if err != nil { + return nil, err + } + c := &Component{ + opts: o, + ls: data.(labelstore.LabelStore), + sourceLabel: args.SourceLabel, + targetLabel: args.TargetLabel, + mappings: args.LabelValuesMapping, + debugDataPublisher: debugDataPublisher.(livedebugging.DebugDataPublisher), + } + c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "alloy_prometheus_mapping_metrics_processed", + Help: "Total number of metrics processed", + }) + c.metricsOutgoing = prometheus_client.NewCounter(prometheus_client.CounterOpts{ + Name: "alloy_prometheus_mapping_metrics_written", + Help: "Total number of metrics written", + }) + + for _, metric := range []prometheus_client.Collector{c.metricsProcessed, c.metricsOutgoing} { + err = o.Registerer.Register(metric) + if err != nil { + return nil, err + } + } + + c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID, o.Registerer, c.ls) + c.receiver = prometheus.NewInterceptor( + c.fanout, + c.ls, + prometheus.WithAppendHook(func(_ storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(v, l) + if newLbl.IsEmpty() { + return 0, nil + } + c.metricsOutgoing.Inc() + return next.Append(0, newLbl, t, v) + }), + prometheus.WithExemplarHook(func(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(0, l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.AppendExemplar(0, newLbl, e) + }), + prometheus.WithMetadataHook(func(_ storage.SeriesRef, l labels.Labels, m metadata.Metadata, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(0, l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.UpdateMetadata(0, newLbl, m) + }), + prometheus.WithHistogramHook(func(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram, next storage.Appender) (storage.SeriesRef, error) { + if c.exited.Load() { + return 0, fmt.Errorf("%s has exited", o.ID) + } + + newLbl := c.mapping(0, l) + if newLbl.IsEmpty() { + return 0, nil + } + return next.AppendHistogram(0, newLbl, t, h, fh) + }), + ) + + // Immediately export the receiver which remains the same for the component + // lifetime. + o.OnStateChange(Exports{Receiver: c.receiver}) + + // Call to Update() to set the relabelling rules once at the start. + if err = c.Update(args); err != nil { + return nil, err + } + + return c, nil +} + +// Run implements component.Component. +func (c *Component) Run(ctx context.Context) error { + defer c.exited.Store(true) + + <-ctx.Done() + return nil +} + +// Update implements component.Component. +func (c *Component) Update(args component.Arguments) error { + c.mut.Lock() + defer c.mut.Unlock() + + newArgs := args.(Arguments) + c.sourceLabel = newArgs.SourceLabel + c.targetLabel = newArgs.TargetLabel + c.mappings = newArgs.LabelValuesMapping + c.fanout.UpdateChildren(newArgs.ForwardTo) + + c.opts.OnStateChange(Exports{Receiver: c.receiver}) + + return nil +} + +func (c *Component) mapping(val float64, lbls labels.Labels) labels.Labels { + // Relabel against a copy of the labels to prevent modifying the original + // slice. + lb := labels.NewBuilder(lbls.Copy()) + sourceValue := lb.Get(c.sourceLabel) + targetValue := c.mappings[sourceValue] + lb.Set(c.targetLabel, targetValue) + newLabels := lb.Labels() + + componentID := livedebugging.ComponentID(c.opts.ID) + if c.debugDataPublisher.IsActive(componentID) { + c.debugDataPublisher.Publish(componentID, fmt.Sprintf("%s => %s", lbls.String(), newLabels.String())) + } + + return newLabels +} + +func (c *Component) LiveDebugging(_ int) {} diff --git a/internal/component/prometheus/mapping/mapping_test.go b/internal/component/prometheus/mapping/mapping_test.go new file mode 100644 index 0000000000..18b4ed3368 --- /dev/null +++ b/internal/component/prometheus/mapping/mapping_test.go @@ -0,0 +1,82 @@ +package mapping + +import ( + "fmt" + "testing" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/grafana/alloy/internal/util" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func TestValidator(t *testing.T) { + args := Arguments{} + err := args.Validate() + require.NoError(t, err) +} + +func TestMapping(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "value1") + newLbls := mapper.mapping(0, lbls) + require.True(t, newLbls.Has("target")) +} + +func TestMappingEmptySourceLabelValue(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "") + newLbls := mapper.mapping(0, lbls) + require.True(t, newLbls.Has("target")) + require.Equal(t, newLbls.Get("target"), "empty") +} + +func TestMappingEmptyTargetLabelValue(t *testing.T) { + mapper := generateMapping(t) + lbls := labels.FromStrings("source", "value2") + newLbls := mapper.mapping(0, lbls) + require.False(t, newLbls.Has("target")) +} + +func generateMapping(t *testing.T) *Component { + ls := labelstore.New(nil, prom.DefaultRegisterer) + fanout := prometheus.NewInterceptor(nil, ls, prometheus.WithAppendHook(func(ref storage.SeriesRef, l labels.Labels, _ int64, _ float64, _ storage.Appender) (storage.SeriesRef, error) { + require.True(t, l.Has("new_label")) + return ref, nil + })) + mapper, err := New(component.Options{ + ID: "1", + Logger: util.TestAlloyLogger(t), + OnStateChange: func(e component.Exports) {}, + Registerer: prom.NewRegistry(), + GetServiceData: getServiceData, + }, Arguments{ + ForwardTo: []storage.Appendable{fanout}, + SourceLabel: "source", + TargetLabel: "target", + LabelValuesMapping: map[string]string{ + "": "empty", + "value1": "eulav", + "value2": "", + }, + }) + require.NotNil(t, mapper) + require.NoError(t, err) + return mapper +} + +func getServiceData(name string) (interface{}, error) { + switch name { + case labelstore.ServiceName: + return labelstore.New(nil, prom.DefaultRegisterer), nil + case livedebugging.ServiceName: + return livedebugging.NewLiveDebugging(), nil + default: + return nil, fmt.Errorf("service not found %s", name) + } +}