From 051378918a6e1802ad5a5157f44053b85dc29882 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Tue, 1 Nov 2022 11:23:08 -0400 Subject: [PATCH] Convert to using prometheus appendable (#2431) * initial commit of switching over to using appendable * comments * simplify fanout * Switch to using a common fanout appendable. * Merge changes * Fix lint * Add check * remove panics * PR feedback * fix error * separate fanout and intercept patterns (#2447) * separate fanout and intercept patterns * testing flow * Add middleware style next to interceptor. * Simplify the remote_write and add multierror. --- component/common/appendable/appendable.go | 111 --------------- component/prometheus/fanout.go | 107 +++++++++++++++ component/prometheus/globalrefmap.go | 9 +- component/prometheus/globalrefmap_test.go | 25 ++-- component/prometheus/interceptor.go | 116 ++++++++++++++++ component/prometheus/receiver.go | 64 --------- component/prometheus/receiver_test.go | 43 ------ component/prometheus/relabel/relabel.go | 90 +++++++------ component/prometheus/relabel/relabel_test.go | 126 +++++++----------- .../prometheus/remotewrite/remote_write.go | 58 +++----- .../remotewrite/remote_write_test.go | 13 +- component/prometheus/remotewrite/types.go | 4 +- component/prometheus/scrape/scrape.go | 14 +- component/prometheus/scrape/scrape_test.go | 35 +++-- 14 files changed, 385 insertions(+), 430 deletions(-) delete mode 100644 component/common/appendable/appendable.go create mode 100644 component/prometheus/fanout.go create mode 100644 component/prometheus/interceptor.go delete mode 100644 component/prometheus/receiver.go delete mode 100644 component/prometheus/receiver_test.go diff --git a/component/common/appendable/appendable.go b/component/common/appendable/appendable.go deleted file mode 100644 index c34900ce7bbd..000000000000 --- a/component/common/appendable/appendable.go +++ /dev/null @@ -1,111 +0,0 @@ -package appendable - -import ( - "context" - "sync" - - "github.com/grafana/agent/component/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/storage" -) - -// FlowMetric is a wrapper around a single sample without the timestamp. -type FlowMetric struct { - Labels labels.Labels - Value float64 -} - -// FlowAppendable is a flow-specific implementation of an Appender. -type FlowAppendable struct { - mut sync.RWMutex - receivers []*prometheus.Receiver -} - -// NewFlowAppendable initializes the appendable. -func NewFlowAppendable(receivers ...*prometheus.Receiver) *FlowAppendable { - return &FlowAppendable{ - receivers: receivers, - } -} - -type flowAppender struct { - buffer map[int64][]*prometheus.FlowMetric // Though mostly a map of 1 item, this allows it to work if more than one TS gets added - receivers []*prometheus.Receiver -} - -// Appender implements the Prometheus Appendable interface. -func (app *FlowAppendable) Appender(_ context.Context) storage.Appender { - app.mut.RLock() - defer app.mut.RUnlock() - - return &flowAppender{ - buffer: make(map[int64][]*prometheus.FlowMetric), - receivers: app.receivers, - } -} - -// SetReceivers defines the list of receivers for this appendable. -func (app *FlowAppendable) SetReceivers(receivers []*prometheus.Receiver) { - app.mut.Lock() - app.receivers = receivers - app.mut.Unlock() -} - -// ListReceivers is a test method for exposing the Appender's receivers. -func (app *FlowAppendable) ListReceivers() []*prometheus.Receiver { - app.mut.RLock() - defer app.mut.RUnlock() - return app.receivers -} - -func (app *flowAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if len(app.receivers) == 0 { - return 0, nil - } - _, found := app.buffer[t] - if !found { - set := make([]*prometheus.FlowMetric, 0) - app.buffer[t] = set - } - // If ref is 0 then lets grab a global id - if ref == 0 { - ref = storage.SeriesRef(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(l)) - } - // If it is stale then we can remove it - if value.IsStaleNaN(v) { - prometheus.GlobalRefMapping.AddStaleMarker(uint64(ref), l) - } else { - prometheus.GlobalRefMapping.RemoveStaleMarker(uint64(ref)) - } - app.buffer[t] = append(app.buffer[t], prometheus.NewFlowMetric(uint64(ref), l, v)) - return ref, nil -} - -func (app *flowAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return 0, nil -} - -func (app *flowAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - return 0, nil -} - -func (app *flowAppender) Commit() error { - for _, r := range app.receivers { - for ts, metrics := range app.buffer { - if r == nil || r.Receive == nil { - continue - } - r.Receive(ts, metrics) - } - } - app.buffer = make(map[int64][]*prometheus.FlowMetric) - return nil -} - -func (app *flowAppender) Rollback() error { - app.buffer = make(map[int64][]*prometheus.FlowMetric) - return nil -} diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go new file mode 100644 index 000000000000..47671d20c731 --- /dev/null +++ b/component/prometheus/fanout.go @@ -0,0 +1,107 @@ +package prometheus + +import ( + "context" + "fmt" + "sync" + + "github.com/hashicorp/go-multierror" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + + "github.com/prometheus/prometheus/storage" +) + +var _ storage.Appendable = (*Fanout)(nil) + +// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. +type Fanout struct { + mut sync.RWMutex + // children is where to fan out. + children []storage.Appendable + // ComponentID is what component this belongs to. + componentID string +} + +// NewFanout creates a fanout appendable. +func NewFanout(children []storage.Appendable, componentID string) *Fanout { + return &Fanout{ + children: children, + componentID: componentID, + } +} + +// UpdateChildren allows changing of the children of the fanout. +func (f *Fanout) UpdateChildren(children []storage.Appendable) { + f.mut.Lock() + defer f.mut.Unlock() + f.children = children +} + +// Appender satisfies the Appendable interface. +func (f *Fanout) Appender(ctx context.Context) storage.Appender { + f.mut.RLock() + defer f.mut.RUnlock() + + app := &appender{ + children: make([]storage.Appender, 0), + componentID: f.componentID, + } + for _, x := range f.children { + if x == nil { + continue + } + app.children = append(app.children, x.Appender(ctx)) + } + return app +} + +var _ storage.Appender = (*appender)(nil) + +type appender struct { + children []storage.Appender + componentID string +} + +// Append satisfies the Appender interface. +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if ref == 0 { + ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + } + var multiErr error + for _, x := range a.children { + _, err := x.Append(ref, l, t, v) + if err != nil { + multiErr = multierror.Append(multiErr, err) + } + } + return ref, multiErr +} + +// Commit satisfies the Appender interface. +func (a *appender) Commit() error { + for _, x := range a.children { + _ = x.Commit() + } + return nil +} + +// Rollback satisifies the Appender interface. +func (a *appender) Rollback() error { + for _, x := range a.children { + _, _ = x, a.Rollback() + } + return nil +} + +// AppendExemplar satisfies the Appender interface. +func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + return 0, fmt.Errorf("appendExemplar not supported yet") +} + +// UpdateMetadata satisifies the Appender interface. +func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + return 0, fmt.Errorf("updateMetadata not supported yet") +} diff --git a/component/prometheus/globalrefmap.go b/component/prometheus/globalrefmap.go index 8196c558f3bf..06aa88a732f7 100644 --- a/component/prometheus/globalrefmap.go +++ b/component/prometheus/globalrefmap.go @@ -44,7 +44,7 @@ func newGlobalRefMap() *GlobalRefMap { } // GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id. -func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *FlowMetric) uint64 { +func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 { g.mut.Lock() defer g.mut.Unlock() @@ -59,7 +59,7 @@ func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *F g.mappings[componentID] = m } - labelHash := fm.labels.Hash() + labelHash := lbls.Hash() globalID, found := g.labelsHashToGlobal[labelHash] if found { m.localToGlobal[localRefID] = globalID @@ -79,6 +79,11 @@ func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 { g.mut.Lock() defer g.mut.Unlock() + // Guard against bad input. + if l == nil { + return 0 + } + labelHash := l.Hash() globalID, found := g.labelsHashToGlobal[labelHash] if found { diff --git a/component/prometheus/globalrefmap_test.go b/component/prometheus/globalrefmap_test.go index d5ce1d83b398..096a52a29fe8 100644 --- a/component/prometheus/globalrefmap_test.go +++ b/component/prometheus/globalrefmap_test.go @@ -48,8 +48,7 @@ func TestAddingLocalMapping(t *testing.T) { }) globalID := mapping.GetOrAddGlobalRefID(l) - fm := NewFlowMetric(globalID, l, 0) - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) require.True(t, globalID == shouldBeSameGlobalID) require.Len(t, mapping.labelsHashToGlobal, 1) require.Len(t, mapping.mappings, 1) @@ -67,9 +66,8 @@ func TestAddingLocalMappings(t *testing.T) { }) globalID := mapping.GetOrAddGlobalRefID(l) - fm := NewFlowMetric(globalID, l, 0) - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) - shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) + shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l) require.True(t, globalID == shouldBeSameGlobalID) require.True(t, globalID == shouldBeSameGlobalID2) require.Len(t, mapping.labelsHashToGlobal, 1) @@ -92,10 +90,8 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { Value: "test", }) - fm := NewFlowMetric(1, l, 0) - - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) - shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) + shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l) require.True(t, shouldBeSameGlobalID2 == shouldBeSameGlobalID) require.Len(t, mapping.labelsHashToGlobal, 1) require.Len(t, mapping.mappings, 2) @@ -122,11 +118,8 @@ func TestStaleness(t *testing.T) { Value: "test2", }) - fm := NewFlowMetric(0, l, 0) - fm2 := NewFlowMetric(0, l2, 0) - - global1 := mapping.GetOrAddLink("1", 1, fm) - _ = mapping.GetOrAddLink("2", 1, fm2) + global1 := mapping.GetOrAddLink("1", 1, l) + _ = mapping.GetOrAddLink("2", 1, l2) mapping.AddStaleMarker(global1, l) require.Len(t, mapping.staleGlobals, 1) require.Len(t, mapping.labelsHashToGlobal, 2) @@ -145,9 +138,7 @@ func TestRemovingStaleness(t *testing.T) { Value: "test", }) - fm := NewFlowMetric(0, l, 0) - - global1 := mapping.GetOrAddLink("1", 1, fm) + global1 := mapping.GetOrAddLink("1", 1, l) mapping.AddStaleMarker(global1, l) require.Len(t, mapping.staleGlobals, 1) mapping.RemoveStaleMarker(global1) diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go new file mode 100644 index 000000000000..dca9c88341f5 --- /dev/null +++ b/component/prometheus/interceptor.go @@ -0,0 +1,116 @@ +package prometheus + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +// Intercept func allows interceptor owners to inject custom behavior. +type Intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) + +// Interceptor supports the concept of an appendable/appender that you can add a func to be called before +// the values are sent to the child appendable/append. +type Interceptor struct { + mut sync.RWMutex + // intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. + // Intercept shouuld be thread safe and can be called across appenders. + intercept Intercept + // next is where to send the next command. + next storage.Appendable + + // ComponentID is what component this belongs to. + componentID string +} + +// NewInterceptor creates a interceptor appendable. +func NewInterceptor(inter Intercept, next storage.Appendable, componentID string) (*Interceptor, error) { + if inter == nil { + return nil, fmt.Errorf("intercept cannot be null for component %s", componentID) + } + return &Interceptor{ + intercept: inter, + next: next, + componentID: componentID, + }, nil +} + +// UpdateChild allows changing of the child of the interceptor. +func (f *Interceptor) UpdateChild(child storage.Appendable) { + f.mut.Lock() + defer f.mut.Unlock() + + f.next = child +} + +// Appender satisfies the Appendable interface. +func (f *Interceptor) Appender(ctx context.Context) storage.Appender { + f.mut.RLock() + defer f.mut.RUnlock() + + app := &interceptappender{ + intercept: f.intercept, + componentID: f.componentID, + } + if f.next != nil { + app.child = f.next.Appender(ctx) + } + return app +} + +var _ storage.Appender = (*appender)(nil) + +type interceptappender struct { + child storage.Appender + componentID string + intercept Intercept +} + +// Append satisfies the Appender interface. +func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if ref == 0 { + ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + } + return a.intercept(ref, l, t, v, a.child) +} + +// Commit satisfies the Appender interface. +func (a *interceptappender) Commit() error { + if a.child == nil { + return nil + } + return a.child.Commit() +} + +// Rollback satisifies the Appender interface. +func (a *interceptappender) Rollback() error { + if a.child == nil { + return nil + } + return a.child.Rollback() +} + +// AppendExemplar satisfies the Appender interface. +func (a *interceptappender) AppendExemplar( + ref storage.SeriesRef, + l labels.Labels, + e exemplar.Exemplar, +) (storage.SeriesRef, error) { + + return 0, fmt.Errorf("appendExemplar not supported yet") +} + +// UpdateMetadata satisifies the Appender interface. +func (a *interceptappender) UpdateMetadata( + ref storage.SeriesRef, + l labels.Labels, + m metadata.Metadata, +) (storage.SeriesRef, error) { + + return 0, fmt.Errorf("updateMetadata not supported yet") +} diff --git a/component/prometheus/receiver.go b/component/prometheus/receiver.go deleted file mode 100644 index d0f132933c4b..000000000000 --- a/component/prometheus/receiver.go +++ /dev/null @@ -1,64 +0,0 @@ -package prometheus - -import ( - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" -) - -// Receiver is used to pass an array of metrics to another receiver -type Receiver struct { - // metrics should be considered immutable - Receive func(timestamp int64, metrics []*FlowMetric) -} - -// RiverCapsule marks receivers as a capsule. -func (r Receiver) RiverCapsule() {} - -// FlowMetric is a wrapper around a single metric without the timestamp. -type FlowMetric struct { - globalRefID uint64 - labels labels.Labels - value float64 -} - -// NewFlowMetric instantiates a new flow metric -func NewFlowMetric(globalRefID uint64, lbls labels.Labels, value float64) *FlowMetric { - // Always ensure we have a valid global ref id - if globalRefID == 0 { - globalRefID = GlobalRefMapping.GetOrAddGlobalRefID(lbls) - } - return &FlowMetric{ - globalRefID: globalRefID, - labels: lbls, - value: value, - } -} - -// GlobalRefID Retrieves the GlobalRefID -func (fw *FlowMetric) GlobalRefID() uint64 { return fw.globalRefID } - -// Value returns the value -func (fw *FlowMetric) Value() float64 { return fw.value } - -// LabelsCopy returns a copy of the labels structure -func (fw *FlowMetric) LabelsCopy() labels.Labels { - return fw.labels.Copy() -} - -// RawLabels returns the actual underlying labels that SHOULD be treated as immutable. Usage of this -// must be very careful to ensure that nothing that consume this mutates labels in anyway. -func (fw *FlowMetric) RawLabels() labels.Labels { - return fw.labels -} - -// Relabel applies normal prometheus relabel rules and returns a flow metric. NOTE this may return itself. -func (fw *FlowMetric) Relabel(cfgs ...*relabel.Config) *FlowMetric { - retLbls := relabel.Process(fw.labels, cfgs...) - if retLbls == nil { - return nil - } - if retLbls.Hash() == fw.labels.Hash() && labels.Equal(retLbls, fw.labels) { - return fw - } - return NewFlowMetric(0, retLbls, fw.value) -} diff --git a/component/prometheus/receiver_test.go b/component/prometheus/receiver_test.go deleted file mode 100644 index 642c883901c9..000000000000 --- a/component/prometheus/receiver_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package prometheus - -import ( - "testing" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" - "github.com/stretchr/testify/require" -) - -func TestRelabel(t *testing.T) { - fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0) - require.True(t, fm.globalRefID != 0) - rg, _ := relabel.NewRegexp("(.*)") - newfm := fm.Relabel(&relabel.Config{ - Replacement: "${1}_new", - Action: "replace", - TargetLabel: "new", - Regex: rg, - }) - require.Len(t, fm.labels, 1) - require.True(t, fm.labels.Has("key")) - - require.Len(t, newfm.labels, 2) - require.True(t, newfm.labels.Has("new")) -} - -func TestRelabelTheSame(t *testing.T) { - fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0) - require.True(t, fm.globalRefID != 0) - rg, _ := relabel.NewRegexp("bad") - newfm := fm.Relabel(&relabel.Config{ - Replacement: "${1}_new", - Action: "replace", - TargetLabel: "new", - Regex: rg, - }) - require.Len(t, fm.labels, 1) - require.True(t, fm.labels.Has("key")) - require.Len(t, newfm.labels, 1) - require.True(t, newfm.globalRefID == fm.globalRefID) - require.True(t, labels.Equal(newfm.labels, fm.labels)) -} diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index 87cfcfbd91a0..d928810010bc 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/prometheus/prometheus/storage" + "github.com/grafana/agent/component" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/prometheus" @@ -29,7 +31,7 @@ func init() { // component. type Arguments struct { // Where the relabelled metrics should be forwarded to. - ForwardTo []*prometheus.Receiver `river:"forward_to,attr"` + ForwardTo []storage.Appendable `river:"forward_to,attr"` // The relabelling rules to apply to each metric before it's forwarded. MetricRelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"` @@ -37,7 +39,7 @@ type Arguments struct { // Exports holds values which are exported by the prometheus.relabel component. type Exports struct { - Receiver *prometheus.Receiver `river:"receiver,attr"` + Receiver storage.Appendable `river:"receiver,attr"` } // Component implements the prometheus.relabel component. @@ -45,15 +47,17 @@ type Component struct { mut sync.RWMutex opts component.Options mrc []*relabel.Config - forwardto []*prometheus.Receiver - receiver *prometheus.Receiver + receiver *prometheus.Interceptor metricsProcessed prometheus_client.Counter + fanout *prometheus.Fanout cacheMut sync.RWMutex cache map[uint64]*labelAndID } -var _ component.Component = (*Component)(nil) +var ( + _ component.Component = (*Component)(nil) +) // New creates a new prometheus.relabel component. func New(o component.Options, args Arguments) (*Component, error) { @@ -61,7 +65,6 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, cache: make(map[uint64]*labelAndID), } - c.receiver = &prometheus.Receiver{Receive: c.Receive} c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ Name: "agent_prometheus_relabel_metrics_processed", Help: "Total number of metrics processed", @@ -72,6 +75,18 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } + c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID) + c.receiver, err = prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + newLbl := c.relabel(v, l) + if newLbl == nil { + return 0, nil + } + return next.Append(0, newLbl, t, v) + }, c.fanout, c.opts.ID) + + if err != nil { + return nil, err + } // Immediately export the receiver which remains the same for the component // lifetime. o.OnStateChange(Exports{Receiver: c.receiver}) @@ -99,49 +114,39 @@ func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) c.clearCache() c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) - c.forwardto = newArgs.ForwardTo + c.fanout.UpdateChildren(newArgs.ForwardTo) + c.opts.OnStateChange(Exports{Receiver: c.receiver}) return nil } -// Receive implements the receiver.Receive func that allows an array of metrics -// to be passed around. -func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { +func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() - relabelledMetrics := make([]*prometheus.FlowMetric, 0) - for _, m := range metricArr { - // Relabel may return the original flowmetric if no changes applied, nil if everything was removed or an entirely new flowmetric. - var relabelledFm *prometheus.FlowMetric - lbls, found := c.getFromCache(m.GlobalRefID()) - if found { - // If lbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels - if lbls != nil { - relabelledFm = prometheus.NewFlowMetric(lbls.id, lbls.labels, m.Value()) - } - } else { - relabelledFm = m.Relabel(c.mrc...) - c.addToCache(m.GlobalRefID(), relabelledFm) - } - - // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. - // TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple. - // In the future the global ref cache may have some hooks to allow notification of when caches should be evicted. - if value.IsStaleNaN(m.Value()) { - c.deleteFromCache(m.GlobalRefID()) + globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) + var relabelled labels.Labels + newLbls, found := c.getFromCache(globalRef) + if found { + // If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels + if newLbls != nil { + relabelled = newLbls.labels } - if relabelledFm == nil { - continue - } - relabelledMetrics = append(relabelledMetrics, relabelledFm) + } else { + relabelled = relabel.Process(lbls, c.mrc...) + c.addToCache(globalRef, relabelled) } - if len(relabelledMetrics) == 0 { - return + + // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. + // TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple. + // In the future the global ref cache may have some hooks to allow notification of when caches should be evicted. + if value.IsStaleNaN(val) { + c.deleteFromCache(globalRef) } - for _, forward := range c.forwardto { - forward.Receive(ts, relabelledMetrics) + if relabelled == nil { + return nil } + return relabelled } func (c *Component) getFromCache(id uint64) (*labelAndID, bool) { @@ -166,17 +171,18 @@ func (c *Component) clearCache() { c.cache = make(map[uint64]*labelAndID) } -func (c *Component) addToCache(originalID uint64, fm *prometheus.FlowMetric) { +func (c *Component) addToCache(originalID uint64, lbls labels.Labels) { c.cacheMut.Lock() defer c.cacheMut.Unlock() - if fm == nil { + if lbls == nil { c.cache[originalID] = nil return } + newGlobal := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) c.cache[originalID] = &labelAndID{ - labels: fm.RawLabels(), - id: fm.GlobalRefID(), + labels: lbls, + id: newGlobal, } } diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index 07e50bde99ba..afd879aee583 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/go-kit/log" "github.com/grafana/agent/component" flow_relabel "github.com/grafana/agent/component/common/relabel" @@ -15,38 +17,37 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" ) func TestCache(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) - entry, found := relabeller.getFromCache(fm.GlobalRefID()) - newFm := prometheus.NewFlowMetric(entry.id, entry.labels, 0) + entry, found := relabeller.getFromCache(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)) require.True(t, found) require.NotNil(t, entry) - require.True(t, newFm.GlobalRefID() != fm.GlobalRefID()) + require.True( + t, + prometheus.GlobalRefMapping.GetOrAddGlobalRefID(entry.labels) != prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls), + ) } func TestEviction(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) - fmstale := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), math.Float64frombits(value.StaleNaN)) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fmstale}) + relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls) require.Len(t, relabeller.cache, 0) } func TestUpdateReset(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) _ = relabeller.Update(Arguments{ MetricRelabelConfigs: []*flow_relabel.Config{}, @@ -54,53 +55,12 @@ func TestUpdateReset(t *testing.T) { require.Len(t, relabeller.cache, 0) } -func TestThatValueIsNotReused(t *testing.T) { - // The recval is used to ensure that the value isn't also cached with the labels and id. - var recval float64 - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - require.True(t, metrics[0].LabelsCopy().Has("new_label")) - require.True(t, metrics[0].Value() == recval) - }, - } - relabeller, err := New(component.Options{ - ID: "1", - Logger: util.TestLogger(t), - OnStateChange: func(e component.Exports) { - }, - Registerer: prom.NewRegistry(), - }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, - MetricRelabelConfigs: []*flow_relabel.Config{ - { - SourceLabels: []string{"__address__"}, - Regex: flow_relabel.Regexp(relabel.MustNewRegexp("(.+)")), - TargetLabel: "new_label", - Replacement: "new_value", - Action: "replace", - }, - }, - }) - require.NotNil(t, relabeller) - require.NoError(t, err) - - recval = 10 - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), recval) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) - - recval = 20 - newFm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), recval) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{newFm}) - require.True(t, fm.GlobalRefID() == newFm.GlobalRefID()) -} - func TestNil(t *testing.T) { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - // This should never run - require.True(t, false) - }, - } + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + require.True(t, false) + return ref, nil + }, nil, "1") + require.NoError(t, err) relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), @@ -108,7 +68,7 @@ func TestNil(t *testing.T) { }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, @@ -120,29 +80,29 @@ func TestNil(t *testing.T) { require.NotNil(t, relabeller) require.NoError(t, err) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 10) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) - - newFm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 20) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{newFm}) - require.True(t, fm.GlobalRefID() == newFm.GlobalRefID()) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) } func BenchmarkCache(b *testing.B) { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - }, - } l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) - relabeller, _ := New(component.Options{ + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + require.True(b, l.Has("new_label")) + return ref, nil + }, nil, "1") + require.NoError(b, err) + var entry storage.Appendable + _, _ = New(component.Options{ ID: "1", Logger: l, OnStateChange: func(e component.Exports) { + newE := e.(Exports) + entry = newE.Receiver }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, @@ -153,18 +113,22 @@ func BenchmarkCache(b *testing.B) { }, }, }) + + lbls := labels.FromStrings("__address__", "localhost") + app := entry.Appender(context.Background()) + for i := 0; i < b.N; i++ { - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), float64(i)) - relabeller.Receive(0, []*prometheus.FlowMetric{fm}) + app.Append(0, lbls, time.Now().UnixMilli(), 0) } + app.Commit() } func generateRelabel(t *testing.T) *Component { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - require.True(t, metrics[0].LabelsCopy().Has("new_label")) - }, - } + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + require.True(t, l.Has("new_label")) + return ref, nil + }, nil, "1") + require.NoError(t, err) relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), @@ -172,7 +136,7 @@ func generateRelabel(t *testing.T) *Component { }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index ecc41f634df9..db879d17f5e9 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -8,10 +8,13 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/agent/component/prometheus" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/agent/component" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/build" "github.com/grafana/agent/pkg/metrics/wal" "github.com/prometheus/prometheus/model/timestamp" @@ -49,7 +52,7 @@ type Component struct { mut sync.RWMutex cfg Arguments - receiver *prometheus.Receiver + receiver *prometheus.Interceptor } // NewComponent creates a new prometheus.remote_write component. @@ -71,8 +74,22 @@ func NewComponent(o component.Options, c Arguments) (*Component, error) { remoteStore: remoteStore, storage: storage.NewFanout(o.Logger, walStorage, remoteStore), } - res.receiver = &prometheus.Receiver{Receive: res.Receive} - + res.receiver, err = prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + // Conversion is needed because remote_writes assume they own all the IDs, so if you have two remote_writes they will + // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id + // to a local (remote_write) id. + localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(ref)) + newref, nextErr := next.Append(storage.SeriesRef(localID), l, t, v) + // If there was no local id we need to propagate it. + if localID == 0 { + prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newref), l) + } + // return the original ref since this needs to be a globalref id and not the local one. + return ref, nextErr + }, res.storage, o.ID) + if err != nil { + return nil, err + } // Immediately export the receiver which remains the same for the component // lifetime. o.OnStateChange(Exports{Receiver: res.receiver}) @@ -179,36 +196,3 @@ func (c *Component) Update(newConfig component.Arguments) error { c.cfg = cfg return nil } - -// Receive implements the receiver.receive func that allows an array of metrics to be passed -func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { - app := c.storage.Appender(context.Background()) - for _, m := range metricArr { - localID := prometheus.GlobalRefMapping.GetLocalRefID(c.opts.ID, m.GlobalRefID()) - // Currently it doesn't look like the storage interfaces mutate the labels, but thats not a strong - // promise. So this should be treated with care. - newLocal, err := app.Append(storage.SeriesRef(localID), m.RawLabels(), ts, m.Value()) - // Add link if there wasn't one before, and we received a valid local id - if localID == 0 && newLocal != 0 { - prometheus.GlobalRefMapping.GetOrAddLink(c.opts.ID, uint64(newLocal), m) - } - if err != nil { - _ = app.Rollback() - //TODO what should we log and behave? - level.Error(c.log).Log("err", err, "msg", "error receiving metrics", "component", c.opts.ID) - return - } - } - - err := app.Commit() - if err != nil { - level.Error(c.log).Log("msg", "failed to commit samples", "err", err) - } -} - -// Config implements Component. -func (c *Component) Config() Arguments { - c.mut.RLock() - defer c.mut.RUnlock() - return c.cfg -} diff --git a/component/prometheus/remotewrite/remote_write_test.go b/component/prometheus/remotewrite/remote_write_test.go index b88367e2c69e..5a24bdd4e8f4 100644 --- a/component/prometheus/remotewrite/remote_write_test.go +++ b/component/prometheus/remotewrite/remote_write_test.go @@ -1,13 +1,13 @@ package remotewrite_test import ( + "context" "fmt" "net/http" "net/http/httptest" "testing" "time" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/component/prometheus/remotewrite" "github.com/grafana/agent/pkg/flow/componenttest" "github.com/grafana/agent/pkg/river" @@ -81,10 +81,13 @@ func Test(t *testing.T) { // Send metrics to our component. These will be written to the WAL and // subsequently written to our HTTP server. rwExports := tc.Exports().(remotewrite.Exports) - rwExports.Receiver.Receive(sampleTimestamp, []*prometheus.FlowMetric{ - prometheus.NewFlowMetric(0, labels.FromStrings("foo", "bar"), 12), - prometheus.NewFlowMetric(0, labels.FromStrings("fizz", "buzz"), 34), - }) + appender := rwExports.Receiver.Appender(context.Background()) + _, err = appender.Append(0, labels.FromStrings("foo", "bar"), sampleTimestamp, 12) + require.NoError(t, err) + _, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), sampleTimestamp, 34) + require.NoError(t, err) + err = appender.Commit() + require.NoError(t, err) expect := []prompb.TimeSeries{{ Labels: []prompb.Label{ diff --git a/component/prometheus/remotewrite/types.go b/component/prometheus/remotewrite/types.go index 318dc397fc4d..cd90739016c4 100644 --- a/component/prometheus/remotewrite/types.go +++ b/component/prometheus/remotewrite/types.go @@ -7,11 +7,11 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/config" types "github.com/grafana/agent/component/common/config" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/river" common "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -185,7 +185,7 @@ func (o *WALOptions) UnmarshalRiver(f func(interface{}) error) error { // Exports are the set of fields exposed by the prometheus.remote_write // component. type Exports struct { - Receiver *prometheus.Receiver `river:"receiver,attr"` + Receiver storage.Appendable `river:"receiver,attr"` } func convertConfigs(cfg Arguments) (*config.Config, error) { diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index d10848f3e1a2..fd17311756e0 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -7,10 +7,11 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/storage" + "github.com/alecthomas/units" "github.com/go-kit/log/level" "github.com/grafana/agent/component" - fa "github.com/grafana/agent/component/common/appendable" component_config "github.com/grafana/agent/component/common/config" "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/prometheus" @@ -37,8 +38,8 @@ func init() { // Arguments holds values which are used to configure the prometheus.scrape // component. type Arguments struct { - Targets []discovery.Target `river:"targets,attr"` - ForwardTo []*prometheus.Receiver `river:"forward_to,attr"` + Targets []discovery.Target `river:"targets,attr"` + ForwardTo []storage.Appendable `river:"forward_to,attr"` // The job name to override the job label with. JobName string `river:"job_name,attr,optional"` @@ -109,7 +110,7 @@ type Component struct { mut sync.RWMutex args Arguments scraper *scrape.Manager - appendable *fa.FlowAppendable + appendable *prometheus.Fanout } var ( @@ -118,8 +119,7 @@ var ( // New creates a new prometheus.scrape component. func New(o component.Options, args Arguments) (*Component, error) { - flowAppendable := fa.NewFlowAppendable(args.ForwardTo...) - + flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID) scrapeOptions := &scrape.Options{ExtraMetrics: args.ExtraMetrics} scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable) c := &Component{ @@ -184,7 +184,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = newArgs - c.appendable.SetReceivers(newArgs.ForwardTo) + c.appendable.UpdateChildren(newArgs.ForwardTo) sc := getPromScrapeConfigs(c.opts.ID, newArgs) err := c.scraper.ApplyConfig(&config.Config{ diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index cfea21f15485..96f5bc7cd3aa 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/agent/pkg/flow/logging" prometheus_client "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" ) @@ -22,7 +23,7 @@ func TestForwardingToAppendable(t *testing.T) { Registerer: prometheus_client.NewRegistry(), } - nilReceivers := []*prometheus.Receiver{nil, nil} + nilReceivers := []storage.Appendable{nil, nil} args := DefaultArguments args.ForwardTo = nilReceivers @@ -30,9 +31,6 @@ func TestForwardingToAppendable(t *testing.T) { s, err := New(opts, args) require.NoError(t, err) - // List the Appendable's receivers; they are nil. - require.Equal(t, nilReceivers, s.appendable.ListReceivers()) - // Forwarding samples to the nil receivers shouldn't fail. appender := s.appendable.Appender(context.Background()) _, err = appender.Append(0, labels.FromStrings("foo", "bar"), 0, 0) @@ -43,27 +41,26 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 - var receivedSamples []*prometheus.FlowMetric - mockReceiver := []*prometheus.Receiver{ - { - Receive: func(t int64, m []*prometheus.FlowMetric) { - receivedTs = t - receivedSamples = m - }, + var receivedSamples labels.Labels + fanout, err := prometheus.NewInterceptor( + func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + receivedTs = t + receivedSamples = l + return ref, nil }, - } - - args.ForwardTo = mockReceiver + nil, + "1", + ) + require.NoError(t, err) + args.ForwardTo = []storage.Appendable{fanout} err = s.Update(args) require.NoError(t, err) - require.Equal(t, mockReceiver, s.appendable.ListReceivers()) - // Forwarding a sample to the mock receiver should succeed. appender = s.appendable.Appender(context.Background()) - sample := prometheus.NewFlowMetric(1, labels.FromStrings("foo", "bar"), 42.0) timestamp := time.Now().Unix() - _, err = appender.Append(0, sample.LabelsCopy(), timestamp, sample.Value()) + sample := labels.FromStrings("foo", "bar") + _, err = appender.Append(0, sample, timestamp, 42.0) require.NoError(t, err) err = appender.Commit() @@ -71,5 +68,5 @@ func TestForwardingToAppendable(t *testing.T) { require.Equal(t, receivedTs, timestamp) require.Len(t, receivedSamples, 1) - require.Equal(t, receivedSamples[0], sample) + require.Equal(t, receivedSamples, sample) }