From 61c728adcff1e8a2c8926ac968fff49976898e96 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Mon, 24 Oct 2022 17:49:46 -0400 Subject: [PATCH 01/12] initial commit of switching over to using appendable --- component/common/appendable/appendable.go | 15 +- component/prometheus/globalrefmap.go | 8 +- component/prometheus/relabel/relabel.go | 134 ++++++++++++------ .../prometheus/remotewrite/appendable.go | 60 ++++++++ .../prometheus/remotewrite/remote_write.go | 13 +- component/prometheus/remotewrite/types.go | 4 +- component/prometheus/scrape/fanout.go | 67 +++++++++ component/prometheus/scrape/scrape.go | 16 +-- 8 files changed, 242 insertions(+), 75 deletions(-) create mode 100644 component/prometheus/remotewrite/appendable.go create mode 100644 component/prometheus/scrape/fanout.go diff --git a/component/common/appendable/appendable.go b/component/common/appendable/appendable.go index c34900ce7bbd..1e313dcd5246 100644 --- a/component/common/appendable/appendable.go +++ b/component/common/appendable/appendable.go @@ -1,17 +1,6 @@ package appendable -import ( - "context" - "sync" - - "github.com/grafana/agent/component/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/model/value" - "github.com/prometheus/prometheus/storage" -) - +/* // FlowMetric is a wrapper around a single sample without the timestamp. type FlowMetric struct { Labels labels.Labels @@ -108,4 +97,4 @@ func (app *flowAppender) Commit() error { func (app *flowAppender) Rollback() error { app.buffer = make(map[int64][]*prometheus.FlowMetric) return nil -} +}*/ diff --git a/component/prometheus/globalrefmap.go b/component/prometheus/globalrefmap.go index 8196c558f3bf..54f2dcc6d369 100644 --- a/component/prometheus/globalrefmap.go +++ b/component/prometheus/globalrefmap.go @@ -44,7 +44,7 @@ func newGlobalRefMap() *GlobalRefMap { } // GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id. -func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *FlowMetric) uint64 { +func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 { g.mut.Lock() defer g.mut.Unlock() @@ -59,7 +59,7 @@ func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *F g.mappings[componentID] = m } - labelHash := fm.labels.Hash() + labelHash := lbls.Hash() globalID, found := g.labelsHashToGlobal[labelHash] if found { m.localToGlobal[localRefID] = globalID @@ -79,6 +79,10 @@ func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 { g.mut.Lock() defer g.mut.Unlock() + if l == nil { + return 0 + } + labelHash := l.Hash() globalID, found := g.labelsHashToGlobal[labelHash] if found { diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index f1d76cec47bc..5352234b650b 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -4,6 +4,11 @@ import ( "context" "sync" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/metadata" + + "github.com/prometheus/prometheus/storage" + "github.com/grafana/agent/component" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/prometheus" @@ -29,7 +34,7 @@ func init() { // component. type Arguments struct { // Where the relabelled metrics should be forwarded to. - ForwardTo []*prometheus.Receiver `river:"forward_to,attr"` + ForwardTo []storage.Appendable `river:"forward_to,attr"` // The relabelling rules to apply to each metric before it's forwarded. MetricRelabelConfigs []*flow_relabel.Config `river:"rule,block,optional"` @@ -37,7 +42,7 @@ type Arguments struct { // Exports holds values which are exported by the prometheus.relabel component. type Exports struct { - Receiver *prometheus.Receiver `river:"receiver,attr"` + Receiver storage.Appendable `river:"receiver,attr"` } // Component implements the prometheus.relabel component. @@ -45,15 +50,29 @@ type Component struct { mut sync.RWMutex opts component.Options mrc []*relabel.Config - forwardto []*prometheus.Receiver - receiver *prometheus.Receiver + forwardto []storage.Appendable metricsProcessed prometheus_client.Counter cacheMut sync.RWMutex cache map[uint64]*labelAndID } -var _ component.Component = (*Component)(nil) +var ( + _ component.Component = (*Component)(nil) + _ storage.Appendable = (*Component)(nil) + _ storage.Appender = (*appender)(nil) +) + +func (c *Component) Appender(ctx context.Context) storage.Appender { + app := &appender{ + children: make([]storage.Appender, 0), + relabel: c.Relabel, + } + for _, forward := range c.forwardto { + app.children = append(app.children, forward.Appender(ctx)) + } + return app +} // New creates a new prometheus.relabel component. func New(o component.Options, args Arguments) (*Component, error) { @@ -61,7 +80,7 @@ func New(o component.Options, args Arguments) (*Component, error) { opts: o, cache: make(map[uint64]*labelAndID), } - c.receiver = &prometheus.Receiver{Receive: c.Receive} + c.forwardto = args.ForwardTo c.metricsProcessed = prometheus_client.NewCounter(prometheus_client.CounterOpts{ Name: "agent_prometheus_relabel_metrics_processed", Help: "Total number of metrics processed", @@ -95,49 +114,37 @@ func (c *Component) Update(args component.Arguments) error { c.clearCache() c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) c.forwardto = newArgs.ForwardTo - c.opts.OnStateChange(Exports{Receiver: c.receiver}) + c.opts.OnStateChange(Exports{Receiver: c}) return nil } -// Receive implements the receiver.Receive func that allows an array of metrics -// to be passed around. -func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { +func (c *Component) Relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() - - relabelledMetrics := make([]*prometheus.FlowMetric, 0) - for _, m := range metricArr { - // Relabel may return the original flowmetric if no changes applied, nil if everything was removed or an entirely new flowmetric. - var relabelledFm *prometheus.FlowMetric - lbls, found := c.getFromCache(m.GlobalRefID()) - if found { - // If lbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels - if lbls != nil { - relabelledFm = prometheus.NewFlowMetric(lbls.id, lbls.labels, m.Value()) - } - } else { - relabelledFm = m.Relabel(c.mrc...) - c.addToCache(m.GlobalRefID(), relabelledFm) + globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) + var relabelled labels.Labels + newLbls, found := c.getFromCache(globalRef) + if found { + // If newLbls is nil but cache entry was found then we want to keep the value nil, if it's not we want to reuse the labels + if newLbls != nil { + relabelled = newLbls.labels } - - // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. - // TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple. - // In the future the global ref cache may have some hooks to allow notification of when caches should be evicted. - if value.IsStaleNaN(m.Value()) { - c.deleteFromCache(m.GlobalRefID()) - } - if relabelledFm == nil { - continue - } - relabelledMetrics = append(relabelledMetrics, relabelledFm) + } else { + processedLabels := relabel.Process(lbls, c.mrc...) + c.addToCache(globalRef, processedLabels) } - if len(relabelledMetrics) == 0 { - return + + // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. + // TODO: (@mattdurham) This caching can leak and likely needs a timed eviction at some point, but this is simple. + // In the future the global ref cache may have some hooks to allow notification of when caches should be evicted. + if value.IsStaleNaN(val) { + c.deleteFromCache(globalRef) } - for _, forward := range c.forwardto { - forward.Receive(ts, relabelledMetrics) + if relabelled == nil { + return nil } + return newLbls.labels } func (c *Component) getFromCache(id uint64) (*labelAndID, bool) { @@ -162,17 +169,18 @@ func (c *Component) clearCache() { c.cache = make(map[uint64]*labelAndID) } -func (c *Component) addToCache(originalID uint64, fm *prometheus.FlowMetric) { +func (c *Component) addToCache(originalID uint64, lbls labels.Labels) { c.cacheMut.Lock() defer c.cacheMut.Unlock() - if fm == nil { + if lbls == nil { c.cache[originalID] = nil return } + newGlobal := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) c.cache[originalID] = &labelAndID{ - labels: fm.RawLabels(), - id: fm.GlobalRefID(), + labels: lbls, + id: newGlobal, } } @@ -182,3 +190,43 @@ type labelAndID struct { labels labels.Labels id uint64 } + +type appender struct { + children []storage.Appender + relabel func(val float64, lbls labels.Labels) labels.Labels +} + +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + newLbls := a.relabel(v, l) + if newLbls == nil { + return ref, nil + } + for _, x := range a.children { + _, _ = x.Append(ref, newLbls, t, v) + } + return ref, nil +} + +func (a *appender) Commit() error { + for _, x := range a.children { + _ = x.Commit() + } + return nil +} + +func (a *appender) Rollback() error { + for _, x := range a.children { + _ = x.Rollback() + } + return nil +} + +func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + // TODO implement me + panic("implement me") +} + +func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + // TODO implement me + panic("implement me") +} diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go new file mode 100644 index 000000000000..37cc157eb9a9 --- /dev/null +++ b/component/prometheus/remotewrite/appendable.go @@ -0,0 +1,60 @@ +package remotewrite + +import ( + "github.com/grafana/agent/component/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" + "golang.org/x/net/context" +) + +var _ storage.Appendable = (*appendable)(nil) +var _ storage.Appender = (*appender)(nil) + +type appendable struct { + inner storage.Appendable + componentID string +} + +func (a *appendable) Appender(ctx context.Context) storage.Appender { + app := &appender{ + child: a.inner.Appender(ctx), + componentID: a.componentID, + } + return app +} + +type appender struct { + child storage.Appender + componentID string +} + +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + // Conversion is needed because remote_writes assume they own all the IDs, so if you have two remote_writes they will + // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id + // to a local (remote_write) id. + localID := prometheus.GlobalRefMapping.GetLocalRefID(a.componentID, uint64(ref)) + if localID == 0 { + localID = prometheus.GlobalRefMapping.GetOrAddLink(a.componentID, localID, l) + } + return a.child.Append(storage.SeriesRef(localID), l, t, v) +} + +func (a *appender) Commit() error { + return a.child.Commit() +} + +func (a *appender) Rollback() error { + return a.child.Rollback() +} + +func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + //TODO implement me + panic("implement me") +} + +func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + //TODO implement me + panic("implement me") +} diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index 039a73321205..34dc544f3d16 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/agent/component" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/build" "github.com/grafana/agent/pkg/metrics/wal" "github.com/prometheus/prometheus/model/timestamp" @@ -48,8 +47,6 @@ type Component struct { mut sync.RWMutex cfg Arguments - - receiver *prometheus.Receiver } // NewComponent creates a new prometheus.remote_write component. @@ -71,7 +68,6 @@ func NewComponent(o component.Options, c Arguments) (*Component, error) { remoteStore: remoteStore, storage: storage.NewFanout(o.Logger, walStorage, remoteStore), } - res.receiver = &prometheus.Receiver{Receive: res.Receive} if err := res.Update(c); err != nil { return nil, err } @@ -84,7 +80,10 @@ var _ component.Component = (*Component)(nil) // Run implements Component. func (c *Component) Run(ctx context.Context) error { - c.opts.OnStateChange(Exports{Receiver: c.receiver}) + c.opts.OnStateChange(Exports{Receiver: &appendable{ + inner: c.storage, + componentID: c.opts.ID, + }}) defer func() { level.Debug(c.log).Log("msg", "closing storage") err := c.storage.Close() @@ -177,7 +176,7 @@ func (c *Component) Update(newConfig component.Arguments) error { } // Receive implements the receiver.receive func that allows an array of metrics to be passed -func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { +/*func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { app := c.storage.Appender(context.Background()) for _, m := range metricArr { localID := prometheus.GlobalRefMapping.GetLocalRefID(c.opts.ID, m.GlobalRefID()) @@ -200,7 +199,7 @@ func (c *Component) Receive(ts int64, metricArr []*prometheus.FlowMetric) { if err != nil { level.Error(c.log).Log("msg", "failed to commit samples", "err", err) } -} +}*/ // Config implements Component. func (c *Component) Config() Arguments { diff --git a/component/prometheus/remotewrite/types.go b/component/prometheus/remotewrite/types.go index 318dc397fc4d..cd90739016c4 100644 --- a/component/prometheus/remotewrite/types.go +++ b/component/prometheus/remotewrite/types.go @@ -7,11 +7,11 @@ import ( "time" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/config" types "github.com/grafana/agent/component/common/config" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/river" common "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -185,7 +185,7 @@ func (o *WALOptions) UnmarshalRiver(f func(interface{}) error) error { // Exports are the set of fields exposed by the prometheus.remote_write // component. type Exports struct { - Receiver *prometheus.Receiver `river:"receiver,attr"` + Receiver storage.Appendable `river:"receiver,attr"` } func convertConfigs(cfg Arguments) (*config.Config, error) { diff --git a/component/prometheus/scrape/fanout.go b/component/prometheus/scrape/fanout.go new file mode 100644 index 000000000000..55de898013d4 --- /dev/null +++ b/component/prometheus/scrape/fanout.go @@ -0,0 +1,67 @@ +package scrape + +import ( + "context" + + "github.com/grafana/agent/component/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + + "github.com/prometheus/prometheus/storage" +) + +var _ storage.Appendable = (*fanout)(nil) + +type fanout struct { + children []storage.Appendable +} + +func (f *fanout) Appender(ctx context.Context) storage.Appender { + app := &appender{children: make([]storage.Appender, 0)} + for _, x := range f.children { + app.children = append(app.children, x.Appender(ctx)) + } + return app + +} + +var _ storage.Appender = (*appender)(nil) + +type appender struct { + children []storage.Appender +} + +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if ref == 0 { + ref = storage.SeriesRef(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(l)) + } + for _, x := range a.children { + _, _ = x.Append(ref, l, t, v) + } + return ref, nil +} + +func (a *appender) Commit() error { + for _, x := range a.children { + _ = x.Commit() + } + return nil +} + +func (a *appender) Rollback() error { + for _, x := range a.children { + _, _ = x, a.Rollback() + } + return nil +} + +func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + //TODO implement me + panic("implement me") +} + +func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + //TODO implement me + panic("implement me") +} diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index d10848f3e1a2..e4e9e69991ec 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -7,13 +7,13 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/storage" + "github.com/alecthomas/units" "github.com/go-kit/log/level" "github.com/grafana/agent/component" - fa "github.com/grafana/agent/component/common/appendable" component_config "github.com/grafana/agent/component/common/config" "github.com/grafana/agent/component/discovery" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/build" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -37,8 +37,8 @@ func init() { // Arguments holds values which are used to configure the prometheus.scrape // component. type Arguments struct { - Targets []discovery.Target `river:"targets,attr"` - ForwardTo []*prometheus.Receiver `river:"forward_to,attr"` + Targets []discovery.Target `river:"targets,attr"` + ForwardTo []storage.Appendable `river:"forward_to,attr"` // The job name to override the job label with. JobName string `river:"job_name,attr,optional"` @@ -109,7 +109,7 @@ type Component struct { mut sync.RWMutex args Arguments scraper *scrape.Manager - appendable *fa.FlowAppendable + appendable *fanout } var ( @@ -118,8 +118,8 @@ var ( // New creates a new prometheus.scrape component. func New(o component.Options, args Arguments) (*Component, error) { - flowAppendable := fa.NewFlowAppendable(args.ForwardTo...) - + //flowAppendable := fa.NewFlowAppendable(args.ForwardTo...) + flowAppendable := &fanout{children: args.ForwardTo} scrapeOptions := &scrape.Options{ExtraMetrics: args.ExtraMetrics} scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable) c := &Component{ @@ -184,7 +184,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = newArgs - c.appendable.SetReceivers(newArgs.ForwardTo) + c.appendable.children = newArgs.ForwardTo sc := getPromScrapeConfigs(c.opts.ID, newArgs) err := c.scraper.ApplyConfig(&config.Config{ From 68a3b4a4ef0501ab399e2a063a7a739b2dee41e6 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Tue, 25 Oct 2022 09:02:47 -0400 Subject: [PATCH 02/12] comments --- component/common/appendable/appendable.go | 100 ------------------ component/prometheus/relabel/relabel.go | 6 ++ .../prometheus/remotewrite/appendable.go | 6 ++ 3 files changed, 12 insertions(+), 100 deletions(-) delete mode 100644 component/common/appendable/appendable.go diff --git a/component/common/appendable/appendable.go b/component/common/appendable/appendable.go deleted file mode 100644 index 1e313dcd5246..000000000000 --- a/component/common/appendable/appendable.go +++ /dev/null @@ -1,100 +0,0 @@ -package appendable - -/* -// FlowMetric is a wrapper around a single sample without the timestamp. -type FlowMetric struct { - Labels labels.Labels - Value float64 -} - -// FlowAppendable is a flow-specific implementation of an Appender. -type FlowAppendable struct { - mut sync.RWMutex - receivers []*prometheus.Receiver -} - -// NewFlowAppendable initializes the appendable. -func NewFlowAppendable(receivers ...*prometheus.Receiver) *FlowAppendable { - return &FlowAppendable{ - receivers: receivers, - } -} - -type flowAppender struct { - buffer map[int64][]*prometheus.FlowMetric // Though mostly a map of 1 item, this allows it to work if more than one TS gets added - receivers []*prometheus.Receiver -} - -// Appender implements the Prometheus Appendable interface. -func (app *FlowAppendable) Appender(_ context.Context) storage.Appender { - app.mut.RLock() - defer app.mut.RUnlock() - - return &flowAppender{ - buffer: make(map[int64][]*prometheus.FlowMetric), - receivers: app.receivers, - } -} - -// SetReceivers defines the list of receivers for this appendable. -func (app *FlowAppendable) SetReceivers(receivers []*prometheus.Receiver) { - app.mut.Lock() - app.receivers = receivers - app.mut.Unlock() -} - -// ListReceivers is a test method for exposing the Appender's receivers. -func (app *FlowAppendable) ListReceivers() []*prometheus.Receiver { - app.mut.RLock() - defer app.mut.RUnlock() - return app.receivers -} - -func (app *flowAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if len(app.receivers) == 0 { - return 0, nil - } - _, found := app.buffer[t] - if !found { - set := make([]*prometheus.FlowMetric, 0) - app.buffer[t] = set - } - // If ref is 0 then lets grab a global id - if ref == 0 { - ref = storage.SeriesRef(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(l)) - } - // If it is stale then we can remove it - if value.IsStaleNaN(v) { - prometheus.GlobalRefMapping.AddStaleMarker(uint64(ref), l) - } else { - prometheus.GlobalRefMapping.RemoveStaleMarker(uint64(ref)) - } - app.buffer[t] = append(app.buffer[t], prometheus.NewFlowMetric(uint64(ref), l, v)) - return ref, nil -} - -func (app *flowAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return 0, nil -} - -func (app *flowAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - return 0, nil -} - -func (app *flowAppender) Commit() error { - for _, r := range app.receivers { - for ts, metrics := range app.buffer { - if r == nil || r.Receive == nil { - continue - } - r.Receive(ts, metrics) - } - } - app.buffer = make(map[int64][]*prometheus.FlowMetric) - return nil -} - -func (app *flowAppender) Rollback() error { - app.buffer = make(map[int64][]*prometheus.FlowMetric) - return nil -}*/ diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index 5352234b650b..93a0dc5740cc 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -122,6 +122,7 @@ func (c *Component) Update(args component.Arguments) error { func (c *Component) Relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() + globalRef := prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls) var relabelled labels.Labels newLbls, found := c.getFromCache(globalRef) @@ -196,6 +197,7 @@ type appender struct { relabel func(val float64, lbls labels.Labels) labels.Labels } +// Append statisfies the appender interface. func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { newLbls := a.relabel(v, l) if newLbls == nil { @@ -207,6 +209,7 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo return ref, nil } +// Commit satisfies the appender interface. func (a *appender) Commit() error { for _, x := range a.children { _ = x.Commit() @@ -214,6 +217,7 @@ func (a *appender) Commit() error { return nil } +// Rollback satisfies the appender interface. func (a *appender) Rollback() error { for _, x := range a.children { _ = x.Rollback() @@ -221,11 +225,13 @@ func (a *appender) Rollback() error { return nil } +// AppendExemplar satisfies the appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { // TODO implement me panic("implement me") } +// UpdateMetadata satisfies the appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { // TODO implement me panic("implement me") diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index 37cc157eb9a9..48d8dd591505 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -17,6 +17,7 @@ type appendable struct { componentID string } +// Appender satisfies the Appendable interface. func (a *appendable) Appender(ctx context.Context) storage.Appender { app := &appender{ child: a.inner.Appender(ctx), @@ -30,6 +31,7 @@ type appender struct { componentID string } +// Append satisfies the Appender interface. func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { // Conversion is needed because remote_writes assume they own all the IDs, so if you have two remote_writes they will // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id @@ -41,19 +43,23 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo return a.child.Append(storage.SeriesRef(localID), l, t, v) } +// Commit satisfies the Appender interface. func (a *appender) Commit() error { return a.child.Commit() } +// Rollback satisfies the Appender interface. func (a *appender) Rollback() error { return a.child.Rollback() } +// AppendExemplate satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { //TODO implement me panic("implement me") } +// UpdateMetadata satisfies the Appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { //TODO implement me panic("implement me") From c6268ac5aaf08e9b65b61d7a772c54a7f35e99b1 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 07:18:20 -0400 Subject: [PATCH 03/12] simplify fanout --- component/prometheus/fanout.go | 89 ++++++++++++++ component/prometheus/globalrefmap_test.go | 25 ++-- component/prometheus/receiver.go | 64 ---------- component/prometheus/receiver_test.go | 43 ------- component/prometheus/relabel/relabel.go | 71 ++--------- component/prometheus/relabel/relabel_test.go | 112 ++++++------------ .../prometheus/remotewrite/appendable.go | 9 +- .../remotewrite/remote_write_test.go | 13 +- component/prometheus/scrape/fanout.go | 67 ----------- component/prometheus/scrape/scrape.go | 10 +- component/prometheus/scrape/scrape_test.go | 31 ++--- 11 files changed, 174 insertions(+), 360 deletions(-) create mode 100644 component/prometheus/fanout.go delete mode 100644 component/prometheus/receiver.go delete mode 100644 component/prometheus/receiver_test.go delete mode 100644 component/prometheus/scrape/fanout.go diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go new file mode 100644 index 000000000000..0a711c314313 --- /dev/null +++ b/component/prometheus/fanout.go @@ -0,0 +1,89 @@ +package prometheus + +import ( + "context" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + + "github.com/prometheus/prometheus/storage" +) + +var _ storage.Appendable = (*Fanout)(nil) + +// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. +type Fanout struct { + // Intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. + // Intercept shouuld be thread safe and can be called across appenders. + Intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) + + // Children is where to fan out. + Children []storage.Appendable +} + +// Appender satisfies the Appendable interface. +func (f *Fanout) Appender(ctx context.Context) storage.Appender { + app := &appender{children: make([]storage.Appender, len(f.Children))} + for i, x := range f.Children { + app.children[i] = x.Appender(ctx) + } + return app +} + +var _ storage.Appender = (*appender)(nil) + +type appender struct { + children []storage.Appender + intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) +} + +// Append satisfies the Appender interface. +func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if ref == 0 { + ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + } + nRef := ref + nL := l + nT := t + nV := v + if a.intercept != nil { + var err error + nRef, nL, nT, nV, err = a.intercept(ref, l, t, v) + if err != nil { + return 0, err + } + } + for _, x := range a.children { + _, _ = x.Append(nRef, nL, nT, nV) + } + return ref, nil +} + +// Commit satisfies the Appender interface. +func (a *appender) Commit() error { + for _, x := range a.children { + _ = x.Commit() + } + return nil +} + +// Rollback satisifies the Appender interface. +func (a *appender) Rollback() error { + for _, x := range a.children { + _, _ = x, a.Rollback() + } + return nil +} + +// AppendExemplar satisfies the Appender interface. +func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + // TODO implement me + panic("implement me") +} + +// UpdateMetadata satisifies the Appender interface. +func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + // TODO implement me + panic("implement me") +} diff --git a/component/prometheus/globalrefmap_test.go b/component/prometheus/globalrefmap_test.go index d5ce1d83b398..096a52a29fe8 100644 --- a/component/prometheus/globalrefmap_test.go +++ b/component/prometheus/globalrefmap_test.go @@ -48,8 +48,7 @@ func TestAddingLocalMapping(t *testing.T) { }) globalID := mapping.GetOrAddGlobalRefID(l) - fm := NewFlowMetric(globalID, l, 0) - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) require.True(t, globalID == shouldBeSameGlobalID) require.Len(t, mapping.labelsHashToGlobal, 1) require.Len(t, mapping.mappings, 1) @@ -67,9 +66,8 @@ func TestAddingLocalMappings(t *testing.T) { }) globalID := mapping.GetOrAddGlobalRefID(l) - fm := NewFlowMetric(globalID, l, 0) - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) - shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) + shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l) require.True(t, globalID == shouldBeSameGlobalID) require.True(t, globalID == shouldBeSameGlobalID2) require.Len(t, mapping.labelsHashToGlobal, 1) @@ -92,10 +90,8 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) { Value: "test", }) - fm := NewFlowMetric(1, l, 0) - - shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm) - shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm) + shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l) + shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l) require.True(t, shouldBeSameGlobalID2 == shouldBeSameGlobalID) require.Len(t, mapping.labelsHashToGlobal, 1) require.Len(t, mapping.mappings, 2) @@ -122,11 +118,8 @@ func TestStaleness(t *testing.T) { Value: "test2", }) - fm := NewFlowMetric(0, l, 0) - fm2 := NewFlowMetric(0, l2, 0) - - global1 := mapping.GetOrAddLink("1", 1, fm) - _ = mapping.GetOrAddLink("2", 1, fm2) + global1 := mapping.GetOrAddLink("1", 1, l) + _ = mapping.GetOrAddLink("2", 1, l2) mapping.AddStaleMarker(global1, l) require.Len(t, mapping.staleGlobals, 1) require.Len(t, mapping.labelsHashToGlobal, 2) @@ -145,9 +138,7 @@ func TestRemovingStaleness(t *testing.T) { Value: "test", }) - fm := NewFlowMetric(0, l, 0) - - global1 := mapping.GetOrAddLink("1", 1, fm) + global1 := mapping.GetOrAddLink("1", 1, l) mapping.AddStaleMarker(global1, l) require.Len(t, mapping.staleGlobals, 1) mapping.RemoveStaleMarker(global1) diff --git a/component/prometheus/receiver.go b/component/prometheus/receiver.go deleted file mode 100644 index d0f132933c4b..000000000000 --- a/component/prometheus/receiver.go +++ /dev/null @@ -1,64 +0,0 @@ -package prometheus - -import ( - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" -) - -// Receiver is used to pass an array of metrics to another receiver -type Receiver struct { - // metrics should be considered immutable - Receive func(timestamp int64, metrics []*FlowMetric) -} - -// RiverCapsule marks receivers as a capsule. -func (r Receiver) RiverCapsule() {} - -// FlowMetric is a wrapper around a single metric without the timestamp. -type FlowMetric struct { - globalRefID uint64 - labels labels.Labels - value float64 -} - -// NewFlowMetric instantiates a new flow metric -func NewFlowMetric(globalRefID uint64, lbls labels.Labels, value float64) *FlowMetric { - // Always ensure we have a valid global ref id - if globalRefID == 0 { - globalRefID = GlobalRefMapping.GetOrAddGlobalRefID(lbls) - } - return &FlowMetric{ - globalRefID: globalRefID, - labels: lbls, - value: value, - } -} - -// GlobalRefID Retrieves the GlobalRefID -func (fw *FlowMetric) GlobalRefID() uint64 { return fw.globalRefID } - -// Value returns the value -func (fw *FlowMetric) Value() float64 { return fw.value } - -// LabelsCopy returns a copy of the labels structure -func (fw *FlowMetric) LabelsCopy() labels.Labels { - return fw.labels.Copy() -} - -// RawLabels returns the actual underlying labels that SHOULD be treated as immutable. Usage of this -// must be very careful to ensure that nothing that consume this mutates labels in anyway. -func (fw *FlowMetric) RawLabels() labels.Labels { - return fw.labels -} - -// Relabel applies normal prometheus relabel rules and returns a flow metric. NOTE this may return itself. -func (fw *FlowMetric) Relabel(cfgs ...*relabel.Config) *FlowMetric { - retLbls := relabel.Process(fw.labels, cfgs...) - if retLbls == nil { - return nil - } - if retLbls.Hash() == fw.labels.Hash() && labels.Equal(retLbls, fw.labels) { - return fw - } - return NewFlowMetric(0, retLbls, fw.value) -} diff --git a/component/prometheus/receiver_test.go b/component/prometheus/receiver_test.go deleted file mode 100644 index 642c883901c9..000000000000 --- a/component/prometheus/receiver_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package prometheus - -import ( - "testing" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" - "github.com/stretchr/testify/require" -) - -func TestRelabel(t *testing.T) { - fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0) - require.True(t, fm.globalRefID != 0) - rg, _ := relabel.NewRegexp("(.*)") - newfm := fm.Relabel(&relabel.Config{ - Replacement: "${1}_new", - Action: "replace", - TargetLabel: "new", - Regex: rg, - }) - require.Len(t, fm.labels, 1) - require.True(t, fm.labels.Has("key")) - - require.Len(t, newfm.labels, 2) - require.True(t, newfm.labels.Has("new")) -} - -func TestRelabelTheSame(t *testing.T) { - fm := NewFlowMetric(0, labels.FromStrings("key", "value"), 0) - require.True(t, fm.globalRefID != 0) - rg, _ := relabel.NewRegexp("bad") - newfm := fm.Relabel(&relabel.Config{ - Replacement: "${1}_new", - Action: "replace", - TargetLabel: "new", - Regex: rg, - }) - require.Len(t, fm.labels, 1) - require.True(t, fm.labels.Has("key")) - require.Len(t, newfm.labels, 1) - require.True(t, newfm.globalRefID == fm.globalRefID) - require.True(t, labels.Equal(newfm.labels, fm.labels)) -} diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index 93a0dc5740cc..a8dbb51f4355 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -4,9 +4,6 @@ import ( "context" "sync" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/storage" "github.com/grafana/agent/component" @@ -59,21 +56,8 @@ type Component struct { var ( _ component.Component = (*Component)(nil) - _ storage.Appendable = (*Component)(nil) - _ storage.Appender = (*appender)(nil) ) -func (c *Component) Appender(ctx context.Context) storage.Appender { - app := &appender{ - children: make([]storage.Appender, 0), - relabel: c.Relabel, - } - for _, forward := range c.forwardto { - app.children = append(app.children, forward.Appender(ctx)) - } - return app -} - // New creates a new prometheus.relabel component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ @@ -114,12 +98,18 @@ func (c *Component) Update(args component.Arguments) error { c.clearCache() c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) c.forwardto = newArgs.ForwardTo - c.opts.OnStateChange(Exports{Receiver: c}) + c.opts.OnStateChange(Exports{Receiver: &prometheus.Fanout{ + Children: newArgs.ForwardTo, + Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + newLbl := c.relabel(v, l) + return ref, newLbl, t, v, nil + }}, + }) return nil } -func (c *Component) Relabel(val float64, lbls labels.Labels) labels.Labels { +func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { c.mut.RLock() defer c.mut.RUnlock() @@ -191,48 +181,3 @@ type labelAndID struct { labels labels.Labels id uint64 } - -type appender struct { - children []storage.Appender - relabel func(val float64, lbls labels.Labels) labels.Labels -} - -// Append statisfies the appender interface. -func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - newLbls := a.relabel(v, l) - if newLbls == nil { - return ref, nil - } - for _, x := range a.children { - _, _ = x.Append(ref, newLbls, t, v) - } - return ref, nil -} - -// Commit satisfies the appender interface. -func (a *appender) Commit() error { - for _, x := range a.children { - _ = x.Commit() - } - return nil -} - -// Rollback satisfies the appender interface. -func (a *appender) Rollback() error { - for _, x := range a.children { - _ = x.Rollback() - } - return nil -} - -// AppendExemplar satisfies the appender interface. -func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - // TODO implement me - panic("implement me") -} - -// UpdateMetadata satisfies the appender interface. -func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - // TODO implement me - panic("implement me") -} diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index 07e50bde99ba..2826b9071a2b 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -4,7 +4,6 @@ import ( "math" "os" "testing" - "time" "github.com/go-kit/log" "github.com/grafana/agent/component" @@ -15,38 +14,37 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/value" + "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" ) func TestCache(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) - entry, found := relabeller.getFromCache(fm.GlobalRefID()) - newFm := prometheus.NewFlowMetric(entry.id, entry.labels, 0) + entry, found := relabeller.getFromCache(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls)) require.True(t, found) require.NotNil(t, entry) - require.True(t, newFm.GlobalRefID() != fm.GlobalRefID()) + require.True( + t, + prometheus.GlobalRefMapping.GetOrAddGlobalRefID(entry.labels) != prometheus.GlobalRefMapping.GetOrAddGlobalRefID(lbls), + ) } func TestEviction(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) - fmstale := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), math.Float64frombits(value.StaleNaN)) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fmstale}) + relabeller.relabel(math.Float64frombits(value.StaleNaN), lbls) require.Len(t, relabeller.cache, 0) } func TestUpdateReset(t *testing.T) { relabeller := generateRelabel(t) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 0) - - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) require.Len(t, relabeller.cache, 1) _ = relabeller.Update(Arguments{ MetricRelabelConfigs: []*flow_relabel.Config{}, @@ -54,51 +52,11 @@ func TestUpdateReset(t *testing.T) { require.Len(t, relabeller.cache, 0) } -func TestThatValueIsNotReused(t *testing.T) { - // The recval is used to ensure that the value isn't also cached with the labels and id. - var recval float64 - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - require.True(t, metrics[0].LabelsCopy().Has("new_label")) - require.True(t, metrics[0].Value() == recval) - }, - } - relabeller, err := New(component.Options{ - ID: "1", - Logger: util.TestLogger(t), - OnStateChange: func(e component.Exports) { - }, - Registerer: prom.NewRegistry(), - }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, - MetricRelabelConfigs: []*flow_relabel.Config{ - { - SourceLabels: []string{"__address__"}, - Regex: flow_relabel.Regexp(relabel.MustNewRegexp("(.+)")), - TargetLabel: "new_label", - Replacement: "new_value", - Action: "replace", - }, - }, - }) - require.NotNil(t, relabeller) - require.NoError(t, err) - - recval = 10 - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), recval) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) - - recval = 20 - newFm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), recval) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{newFm}) - require.True(t, fm.GlobalRefID() == newFm.GlobalRefID()) -} - func TestNil(t *testing.T) { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - // This should never run + fanout := &prometheus.Fanout{ + Intercept: func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { require.True(t, false) + return ref, l, tt, v, nil }, } relabeller, err := New(component.Options{ @@ -108,7 +66,7 @@ func TestNil(t *testing.T) { }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, @@ -120,21 +78,21 @@ func TestNil(t *testing.T) { require.NotNil(t, relabeller) require.NoError(t, err) - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 10) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{fm}) - - newFm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), 20) - relabeller.Receive(time.Now().Unix(), []*prometheus.FlowMetric{newFm}) - require.True(t, fm.GlobalRefID() == newFm.GlobalRefID()) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) } func BenchmarkCache(b *testing.B) { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - }, - } l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) + fanout := &prometheus.Fanout{ + Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + if !l.Has("new_label") { + panic("must have new label") + } + return ref, l, t, v, nil + }, + } relabeller, _ := New(component.Options{ ID: "1", Logger: l, @@ -142,7 +100,7 @@ func BenchmarkCache(b *testing.B) { }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, @@ -153,16 +111,18 @@ func BenchmarkCache(b *testing.B) { }, }, }) + for i := 0; i < b.N; i++ { - fm := prometheus.NewFlowMetric(0, labels.FromStrings("__address__", "localhost"), float64(i)) - relabeller.Receive(0, []*prometheus.FlowMetric{fm}) + lbls := labels.FromStrings("__address__", "localhost") + relabeller.relabel(0, lbls) } } func generateRelabel(t *testing.T) *Component { - rec := &prometheus.Receiver{ - Receive: func(timestamp int64, metrics []*prometheus.FlowMetric) { - require.True(t, metrics[0].LabelsCopy().Has("new_label")) + fanout := &prometheus.Fanout{ + Intercept: func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + require.True(t, l.Has("new_label")) + return ref, l, tt, v, nil }, } relabeller, err := New(component.Options{ @@ -172,7 +132,7 @@ func generateRelabel(t *testing.T) *Component { }, Registerer: prom.NewRegistry(), }, Arguments{ - ForwardTo: []*prometheus.Receiver{rec}, + ForwardTo: []storage.Appendable{fanout}, MetricRelabelConfigs: []*flow_relabel.Config{ { SourceLabels: []string{"__address__"}, diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index 48d8dd591505..9cb49d28c71e 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -37,10 +37,13 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id // to a local (remote_write) id. localID := prometheus.GlobalRefMapping.GetLocalRefID(a.componentID, uint64(ref)) - if localID == 0 { - localID = prometheus.GlobalRefMapping.GetOrAddLink(a.componentID, localID, l) + noLocalID := localID == 0 + newref, err := a.child.Append(storage.SeriesRef(localID), l, t, v) + // If there was no local id we need to propagate it. + if noLocalID { + prometheus.GlobalRefMapping.GetOrAddLink(a.componentID, uint64(newref), l) } - return a.child.Append(storage.SeriesRef(localID), l, t, v) + return ref, err } // Commit satisfies the Appender interface. diff --git a/component/prometheus/remotewrite/remote_write_test.go b/component/prometheus/remotewrite/remote_write_test.go index b88367e2c69e..4ac343468625 100644 --- a/component/prometheus/remotewrite/remote_write_test.go +++ b/component/prometheus/remotewrite/remote_write_test.go @@ -1,13 +1,13 @@ package remotewrite_test import ( + "context" "fmt" "net/http" "net/http/httptest" "testing" "time" - "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/component/prometheus/remotewrite" "github.com/grafana/agent/pkg/flow/componenttest" "github.com/grafana/agent/pkg/river" @@ -81,10 +81,13 @@ func Test(t *testing.T) { // Send metrics to our component. These will be written to the WAL and // subsequently written to our HTTP server. rwExports := tc.Exports().(remotewrite.Exports) - rwExports.Receiver.Receive(sampleTimestamp, []*prometheus.FlowMetric{ - prometheus.NewFlowMetric(0, labels.FromStrings("foo", "bar"), 12), - prometheus.NewFlowMetric(0, labels.FromStrings("fizz", "buzz"), 34), - }) + appender := rwExports.Receiver.Appender(context.Background()) + _, err = appender.Append(0, labels.FromStrings("foo", "bar"), time.Now().Unix(), 12) + require.NoError(t, err) + _, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), time.Now().Unix(), 34) + require.NoError(t, err) + err = appender.Commit() + require.NoError(t, err) expect := []prompb.TimeSeries{{ Labels: []prompb.Label{ diff --git a/component/prometheus/scrape/fanout.go b/component/prometheus/scrape/fanout.go deleted file mode 100644 index 55de898013d4..000000000000 --- a/component/prometheus/scrape/fanout.go +++ /dev/null @@ -1,67 +0,0 @@ -package scrape - -import ( - "context" - - "github.com/grafana/agent/component/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - - "github.com/prometheus/prometheus/storage" -) - -var _ storage.Appendable = (*fanout)(nil) - -type fanout struct { - children []storage.Appendable -} - -func (f *fanout) Appender(ctx context.Context) storage.Appender { - app := &appender{children: make([]storage.Appender, 0)} - for _, x := range f.children { - app.children = append(app.children, x.Appender(ctx)) - } - return app - -} - -var _ storage.Appender = (*appender)(nil) - -type appender struct { - children []storage.Appender -} - -func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - if ref == 0 { - ref = storage.SeriesRef(prometheus.GlobalRefMapping.GetOrAddGlobalRefID(l)) - } - for _, x := range a.children { - _, _ = x.Append(ref, l, t, v) - } - return ref, nil -} - -func (a *appender) Commit() error { - for _, x := range a.children { - _ = x.Commit() - } - return nil -} - -func (a *appender) Rollback() error { - for _, x := range a.children { - _, _ = x, a.Rollback() - } - return nil -} - -func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - //TODO implement me - panic("implement me") -} - -func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - //TODO implement me - panic("implement me") -} diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index e4e9e69991ec..7a7ebec8f7d2 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/agent/component" component_config "github.com/grafana/agent/component/common/config" "github.com/grafana/agent/component/discovery" + "github.com/grafana/agent/component/prometheus" "github.com/grafana/agent/pkg/build" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" @@ -109,7 +110,7 @@ type Component struct { mut sync.RWMutex args Arguments scraper *scrape.Manager - appendable *fanout + appendable *prometheus.Fanout } var ( @@ -118,8 +119,9 @@ var ( // New creates a new prometheus.scrape component. func New(o component.Options, args Arguments) (*Component, error) { - //flowAppendable := fa.NewFlowAppendable(args.ForwardTo...) - flowAppendable := &fanout{children: args.ForwardTo} + flowAppendable := &prometheus.Fanout{ + Children: args.ForwardTo, + } scrapeOptions := &scrape.Options{ExtraMetrics: args.ExtraMetrics} scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable) c := &Component{ @@ -184,7 +186,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = newArgs - c.appendable.children = newArgs.ForwardTo + c.appendable = &prometheus.Fanout{Children: newArgs.ForwardTo} sc := getPromScrapeConfigs(c.opts.ID, newArgs) err := c.scraper.ApplyConfig(&config.Config{ diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index cfea21f15485..ac5fc74c3360 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/agent/pkg/flow/logging" prometheus_client "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" ) @@ -22,7 +23,7 @@ func TestForwardingToAppendable(t *testing.T) { Registerer: prometheus_client.NewRegistry(), } - nilReceivers := []*prometheus.Receiver{nil, nil} + nilReceivers := []storage.Appendable{nil, nil} args := DefaultArguments args.ForwardTo = nilReceivers @@ -31,7 +32,7 @@ func TestForwardingToAppendable(t *testing.T) { require.NoError(t, err) // List the Appendable's receivers; they are nil. - require.Equal(t, nilReceivers, s.appendable.ListReceivers()) + require.Equal(t, nilReceivers, s.appendable.Children) // Forwarding samples to the nil receivers shouldn't fail. appender := s.appendable.Appender(context.Background()) @@ -43,27 +44,21 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 - var receivedSamples []*prometheus.FlowMetric - mockReceiver := []*prometheus.Receiver{ - { - Receive: func(t int64, m []*prometheus.FlowMetric) { - receivedTs = t - receivedSamples = m - }, - }, - } - - args.ForwardTo = mockReceiver + var receivedSamples labels.Labels + fanout := &prometheus.Fanout{Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + receivedTs = t + receivedSamples = l + return ref, l, t, v, nil + }} + args.ForwardTo = []storage.Appendable{fanout} err = s.Update(args) require.NoError(t, err) - require.Equal(t, mockReceiver, s.appendable.ListReceivers()) - // Forwarding a sample to the mock receiver should succeed. appender = s.appendable.Appender(context.Background()) - sample := prometheus.NewFlowMetric(1, labels.FromStrings("foo", "bar"), 42.0) timestamp := time.Now().Unix() - _, err = appender.Append(0, sample.LabelsCopy(), timestamp, sample.Value()) + sample := labels.FromStrings("foo", "bar") + _, err = appender.Append(0, sample, timestamp, 42.0) require.NoError(t, err) err = appender.Commit() @@ -71,5 +66,5 @@ func TestForwardingToAppendable(t *testing.T) { require.Equal(t, receivedTs, timestamp) require.Len(t, receivedSamples, 1) - require.Equal(t, receivedSamples[0], sample) + require.Equal(t, receivedSamples, sample) } From 5be2521fba500adc23a62443e27c35996585e057 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 09:16:25 -0400 Subject: [PATCH 04/12] Switch to using a common fanout appendable. --- component/prometheus/fanout.go | 55 ++++++++++++++++--- component/prometheus/relabel/relabel.go | 17 +++--- component/prometheus/relabel/relabel_test.go | 49 +++++++++-------- .../prometheus/remotewrite/appendable.go | 3 +- .../remotewrite/remote_write_test.go | 4 +- component/prometheus/scrape/scrape.go | 6 +- component/prometheus/scrape/scrape_test.go | 7 +-- 7 files changed, 87 insertions(+), 54 deletions(-) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 0a711c314313..fa9cc9b54b61 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "sync" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" @@ -12,20 +13,51 @@ import ( var _ storage.Appendable = (*Fanout)(nil) +type intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) + // Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. type Fanout struct { - // Intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. + mut sync.RWMutex + // intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. // Intercept shouuld be thread safe and can be called across appenders. - Intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) + intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) + + // children is where to fan out. + children []storage.Appendable + + // ComponentID is what component this belongs to. + componentID string +} + +// NewFanout creates a fanout appendable. +func NewFanout(inter intercept, children []storage.Appendable, componentID string) *Fanout { + return &Fanout{ + intercept: inter, + children: children, + componentID: componentID, + } +} - // Children is where to fan out. - Children []storage.Appendable +func (f *Fanout) UpdateChildren(children []storage.Appendable) { + f.mut.Lock() + defer f.mut.Unlock() + f.children = children } // Appender satisfies the Appendable interface. func (f *Fanout) Appender(ctx context.Context) storage.Appender { - app := &appender{children: make([]storage.Appender, len(f.Children))} - for i, x := range f.Children { + f.mut.RLock() + defer f.mut.RUnlock() + + app := &appender{ + children: make([]storage.Appender, len(f.children)), + intercept: f.intercept, + componentID: f.componentID, + } + for i, x := range f.children { + if x == nil { + continue + } app.children[i] = x.Appender(ctx) } return app @@ -34,8 +66,9 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { var _ storage.Appender = (*appender)(nil) type appender struct { - children []storage.Appender - intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) + children []storage.Appender + componentID string + intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) } // Append satisfies the Appender interface. @@ -55,6 +88,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo } } for _, x := range a.children { + if x == nil || nL == nil { + continue + } _, _ = x.Append(nRef, nL, nT, nV) } return ref, nil @@ -63,6 +99,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo // Commit satisfies the Appender interface. func (a *appender) Commit() error { for _, x := range a.children { + if x == nil { + continue + } _ = x.Commit() } return nil diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index a8dbb51f4355..2691b2e6137b 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -98,13 +98,10 @@ func (c *Component) Update(args component.Arguments) error { c.clearCache() c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) c.forwardto = newArgs.ForwardTo - c.opts.OnStateChange(Exports{Receiver: &prometheus.Fanout{ - Children: newArgs.ForwardTo, - Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - newLbl := c.relabel(v, l) - return ref, newLbl, t, v, nil - }}, - }) + c.opts.OnStateChange(Exports{Receiver: prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + newLbl := c.relabel(v, l) + return ref, newLbl, t, v, nil + }, newArgs.ForwardTo, c.opts.ID)}) return nil } @@ -122,8 +119,8 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { relabelled = newLbls.labels } } else { - processedLabels := relabel.Process(lbls, c.mrc...) - c.addToCache(globalRef, processedLabels) + relabelled = relabel.Process(lbls, c.mrc...) + c.addToCache(globalRef, relabelled) } // If stale remove from the cache, the reason we don't exit early is so the stale value can propagate. @@ -135,7 +132,7 @@ func (c *Component) relabel(val float64, lbls labels.Labels) labels.Labels { if relabelled == nil { return nil } - return newLbls.labels + return relabelled } func (c *Component) getFromCache(id uint64) (*labelAndID, bool) { diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index 2826b9071a2b..40c1dbeb9624 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -4,6 +4,9 @@ import ( "math" "os" "testing" + "time" + + "golang.org/x/net/context" "github.com/go-kit/log" "github.com/grafana/agent/component" @@ -53,12 +56,10 @@ func TestUpdateReset(t *testing.T) { } func TestNil(t *testing.T) { - fanout := &prometheus.Fanout{ - Intercept: func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - require.True(t, false) - return ref, l, tt, v, nil - }, - } + fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + require.True(t, false) + return ref, l, tt, v, nil + }, nil, "1") relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), @@ -85,18 +86,19 @@ func TestNil(t *testing.T) { func BenchmarkCache(b *testing.B) { l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) - fanout := &prometheus.Fanout{ - Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - if !l.Has("new_label") { - panic("must have new label") - } - return ref, l, t, v, nil - }, - } - relabeller, _ := New(component.Options{ + fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + if !l.Has("new_label") { + panic("must have new label") + } + return ref, l, tt, v, nil + }, nil, "1") + var entry storage.Appendable + _, _ = New(component.Options{ ID: "1", Logger: l, OnStateChange: func(e component.Exports) { + newE := e.(Exports) + entry = newE.Receiver }, Registerer: prom.NewRegistry(), }, Arguments{ @@ -112,19 +114,20 @@ func BenchmarkCache(b *testing.B) { }, }) + lbls := labels.FromStrings("__address__", "localhost") + app := entry.Appender(context.Background()) + for i := 0; i < b.N; i++ { - lbls := labels.FromStrings("__address__", "localhost") - relabeller.relabel(0, lbls) + app.Append(0, lbls, time.Now().UnixMilli(), 0) } + app.Commit() } func generateRelabel(t *testing.T) *Component { - fanout := &prometheus.Fanout{ - Intercept: func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - require.True(t, l.Has("new_label")) - return ref, l, tt, v, nil - }, - } + fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + require.True(t, l.Has("new_label")) + return ref, l, tt, v, nil + }, nil, "1") relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index 9cb49d28c71e..ea32ef54418b 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -37,10 +37,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id // to a local (remote_write) id. localID := prometheus.GlobalRefMapping.GetLocalRefID(a.componentID, uint64(ref)) - noLocalID := localID == 0 newref, err := a.child.Append(storage.SeriesRef(localID), l, t, v) // If there was no local id we need to propagate it. - if noLocalID { + if localID == 0 { prometheus.GlobalRefMapping.GetOrAddLink(a.componentID, uint64(newref), l) } return ref, err diff --git a/component/prometheus/remotewrite/remote_write_test.go b/component/prometheus/remotewrite/remote_write_test.go index 4ac343468625..5a24bdd4e8f4 100644 --- a/component/prometheus/remotewrite/remote_write_test.go +++ b/component/prometheus/remotewrite/remote_write_test.go @@ -82,9 +82,9 @@ func Test(t *testing.T) { // subsequently written to our HTTP server. rwExports := tc.Exports().(remotewrite.Exports) appender := rwExports.Receiver.Appender(context.Background()) - _, err = appender.Append(0, labels.FromStrings("foo", "bar"), time.Now().Unix(), 12) + _, err = appender.Append(0, labels.FromStrings("foo", "bar"), sampleTimestamp, 12) require.NoError(t, err) - _, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), time.Now().Unix(), 34) + _, err = appender.Append(0, labels.FromStrings("fizz", "buzz"), sampleTimestamp, 34) require.NoError(t, err) err = appender.Commit() require.NoError(t, err) diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index 7a7ebec8f7d2..a4b75bdba9d1 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -119,9 +119,7 @@ var ( // New creates a new prometheus.scrape component. func New(o component.Options, args Arguments) (*Component, error) { - flowAppendable := &prometheus.Fanout{ - Children: args.ForwardTo, - } + flowAppendable := prometheus.NewFanout(nil, args.ForwardTo, o.ID) scrapeOptions := &scrape.Options{ExtraMetrics: args.ExtraMetrics} scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable) c := &Component{ @@ -186,7 +184,7 @@ func (c *Component) Update(args component.Arguments) error { defer c.mut.Unlock() c.args = newArgs - c.appendable = &prometheus.Fanout{Children: newArgs.ForwardTo} + c.appendable.UpdateChildren(newArgs.ForwardTo) sc := getPromScrapeConfigs(c.opts.ID, newArgs) err := c.scraper.ApplyConfig(&config.Config{ diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index ac5fc74c3360..c94ccd991646 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -31,9 +31,6 @@ func TestForwardingToAppendable(t *testing.T) { s, err := New(opts, args) require.NoError(t, err) - // List the Appendable's receivers; they are nil. - require.Equal(t, nilReceivers, s.appendable.Children) - // Forwarding samples to the nil receivers shouldn't fail. appender := s.appendable.Appender(context.Background()) _, err = appender.Append(0, labels.FromStrings("foo", "bar"), 0, 0) @@ -45,11 +42,11 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 var receivedSamples labels.Labels - fanout := &prometheus.Fanout{Intercept: func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { receivedTs = t receivedSamples = l return ref, l, t, v, nil - }} + }, nil, "1") args.ForwardTo = []storage.Appendable{fanout} err = s.Update(args) require.NoError(t, err) From 908a80a5cc4227f29ae2b5d93ea3b8ddd65f9371 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 09:37:46 -0400 Subject: [PATCH 05/12] Merge changes --- component/prometheus/relabel/relabel.go | 2 +- component/prometheus/remotewrite/remote_write.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index 81dbc887cf16..ac4a127ede88 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -47,7 +47,7 @@ type Component struct { mut sync.RWMutex opts component.Options mrc []*relabel.Config - receiver *prometheus.Fanout + receiver *prometheus.Fanout metricsProcessed prometheus_client.Counter cacheMut sync.RWMutex diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index 30a8fa0bb4a1..a08648194c6a 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -60,7 +60,6 @@ func NewComponent(o component.Options, c Arguments) (*Component, error) { return nil, err } - remoteLogger := log.With(o.Logger, "subcomponent", "rw") remoteStore := remote.NewStorage(remoteLogger, o.Registerer, startTime, dataPath, remoteFlushDeadline, nil) @@ -182,4 +181,3 @@ func (c *Component) Update(newConfig component.Arguments) error { c.cfg = cfg return nil } - From a00b91615c1d80a7d7a4ac0c63954979f576afa6 Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 09:40:58 -0400 Subject: [PATCH 06/12] Fix lint --- component/prometheus/fanout.go | 1 + 1 file changed, 1 insertion(+) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index fa9cc9b54b61..4c270d5bc567 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -38,6 +38,7 @@ func NewFanout(inter intercept, children []storage.Appendable, componentID strin } } +// UpdateChildren allows changing of the children of the fanout. func (f *Fanout) UpdateChildren(children []storage.Appendable) { f.mut.Lock() defer f.mut.Unlock() From 82d8af35f5e30ed2a9db934be4b334226f2b61da Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 09:42:04 -0400 Subject: [PATCH 07/12] Add check --- component/prometheus/fanout.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 4c270d5bc567..c6bc653e16b4 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -111,6 +111,9 @@ func (a *appender) Commit() error { // Rollback satisifies the Appender interface. func (a *appender) Rollback() error { for _, x := range a.children { + if x == nil { + continue + } _, _ = x, a.Rollback() } return nil From 531f99eff71648a58aa8b4c149c9fbc6f81383db Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 11:37:36 -0400 Subject: [PATCH 08/12] remove panics --- component/prometheus/fanout.go | 7 +++---- component/prometheus/remotewrite/appendable.go | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index c6bc653e16b4..322196f183ce 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "fmt" "sync" "github.com/prometheus/prometheus/model/exemplar" @@ -121,12 +122,10 @@ func (a *appender) Rollback() error { // AppendExemplar satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - // TODO implement me - panic("implement me") + return 0, fmt.Errorf("AppendExemplar not supported yet.") } // UpdateMetadata satisifies the Appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - // TODO implement me - panic("implement me") + return 0, fmt.Errorf("UpdateMetadata not supported yet.") } diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index ea32ef54418b..4c8e6e757221 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -1,6 +1,8 @@ package remotewrite import ( + "fmt" + "github.com/grafana/agent/component/prometheus" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" @@ -57,12 +59,10 @@ func (a *appender) Rollback() error { // AppendExemplate satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - //TODO implement me - panic("implement me") + return 0, fmt.Errorf("AppendExemplar not supported yet.") } // UpdateMetadata satisfies the Appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - //TODO implement me - panic("implement me") + return 0, fmt.Errorf("UpdateMetadata not supported yet.") } From 0b48fc541f3ea1f051f52d988a05a8f2031289ba Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 14:48:28 -0400 Subject: [PATCH 09/12] PR feedback --- component/prometheus/fanout.go | 28 ++++++++----------- component/prometheus/globalrefmap.go | 1 + .../prometheus/remotewrite/appendable.go | 4 +-- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 322196f183ce..37078ed09c55 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -56,11 +56,11 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { intercept: f.intercept, componentID: f.componentID, } - for i, x := range f.children { + for _, x := range f.children { if x == nil { continue } - app.children[i] = x.Appender(ctx) + app.children = append(app.children, x.Appender(ctx)) } return app } @@ -78,22 +78,22 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo if ref == 0 { ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) } - nRef := ref - nL := l - nT := t - nV := v + newRef := ref + newLabels := l + newTimestamp := t + newValue := v if a.intercept != nil { var err error - nRef, nL, nT, nV, err = a.intercept(ref, l, t, v) + newRef, newLabels, newTimestamp, newValue, err = a.intercept(ref, l, t, v) if err != nil { return 0, err } } for _, x := range a.children { - if x == nil || nL == nil { + if newLabels == nil { continue } - _, _ = x.Append(nRef, nL, nT, nV) + _, _ = x.Append(newRef, newLabels, newTimestamp, newValue) } return ref, nil } @@ -101,9 +101,6 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo // Commit satisfies the Appender interface. func (a *appender) Commit() error { for _, x := range a.children { - if x == nil { - continue - } _ = x.Commit() } return nil @@ -112,9 +109,6 @@ func (a *appender) Commit() error { // Rollback satisifies the Appender interface. func (a *appender) Rollback() error { for _, x := range a.children { - if x == nil { - continue - } _, _ = x, a.Rollback() } return nil @@ -122,10 +116,10 @@ func (a *appender) Rollback() error { // AppendExemplar satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return 0, fmt.Errorf("AppendExemplar not supported yet.") + return 0, fmt.Errorf("appendExemplar not supported yet") } // UpdateMetadata satisifies the Appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - return 0, fmt.Errorf("UpdateMetadata not supported yet.") + return 0, fmt.Errorf("updateMetadata not supported yet") } diff --git a/component/prometheus/globalrefmap.go b/component/prometheus/globalrefmap.go index 54f2dcc6d369..06aa88a732f7 100644 --- a/component/prometheus/globalrefmap.go +++ b/component/prometheus/globalrefmap.go @@ -79,6 +79,7 @@ func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 { g.mut.Lock() defer g.mut.Unlock() + // Guard against bad input. if l == nil { return 0 } diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index 4c8e6e757221..337836722002 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -59,10 +59,10 @@ func (a *appender) Rollback() error { // AppendExemplate satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return 0, fmt.Errorf("AppendExemplar not supported yet.") + return 0, fmt.Errorf("appendExemplar not supported yet") } // UpdateMetadata satisfies the Appender interface. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - return 0, fmt.Errorf("UpdateMetadata not supported yet.") + return 0, fmt.Errorf("updateMetadata not supported yet") } From 42cea0538b69acdccdff7cd6f4fc21e7c1bd2c2f Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Wed, 26 Oct 2022 15:06:31 -0400 Subject: [PATCH 10/12] fix error --- component/prometheus/fanout.go | 2 +- component/prometheus/relabel/relabel_test.go | 4 +--- component/prometheus/remotewrite/appendable.go | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 37078ed09c55..55707a78bc34 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -52,7 +52,7 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { defer f.mut.RUnlock() app := &appender{ - children: make([]storage.Appender, len(f.children)), + children: make([]storage.Appender, 0), intercept: f.intercept, componentID: f.componentID, } diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index 40c1dbeb9624..c880e8e13d01 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -87,9 +87,7 @@ func BenchmarkCache(b *testing.B) { l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - if !l.Has("new_label") { - panic("must have new label") - } + require.True(b, l.Has("new_label")) return ref, l, tt, v, nil }, nil, "1") var entry storage.Appendable diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go index 337836722002..f324665df734 100644 --- a/component/prometheus/remotewrite/appendable.go +++ b/component/prometheus/remotewrite/appendable.go @@ -57,7 +57,7 @@ func (a *appender) Rollback() error { return a.child.Rollback() } -// AppendExemplate satisfies the Appender interface. +// AppendExemplar satisfies the Appender interface. func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { return 0, fmt.Errorf("appendExemplar not supported yet") } From d06eabffc1350960a652ed15f6085c414785f546 Mon Sep 17 00:00:00 2001 From: mattdurham Date: Mon, 31 Oct 2022 10:18:21 -0400 Subject: [PATCH 11/12] separate fanout and intercept patterns (#2447) * separate fanout and intercept patterns * testing flow * Add middleware style next to interceptor. --- component/prometheus/fanout.go | 28 +---- component/prometheus/interceptor.go | 116 +++++++++++++++++++ component/prometheus/relabel/relabel.go | 18 ++- component/prometheus/relabel/relabel_test.go | 15 ++- component/prometheus/scrape/scrape.go | 2 +- component/prometheus/scrape/scrape_test.go | 15 ++- 6 files changed, 151 insertions(+), 43 deletions(-) create mode 100644 component/prometheus/interceptor.go diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 55707a78bc34..54163b77447b 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -14,26 +14,18 @@ import ( var _ storage.Appendable = (*Fanout)(nil) -type intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) - // Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends. type Fanout struct { mut sync.RWMutex - // intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. - // Intercept shouuld be thread safe and can be called across appenders. - intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) - // children is where to fan out. children []storage.Appendable - // ComponentID is what component this belongs to. componentID string } // NewFanout creates a fanout appendable. -func NewFanout(inter intercept, children []storage.Appendable, componentID string) *Fanout { +func NewFanout(children []storage.Appendable, componentID string) *Fanout { return &Fanout{ - intercept: inter, children: children, componentID: componentID, } @@ -53,7 +45,6 @@ func (f *Fanout) Appender(ctx context.Context) storage.Appender { app := &appender{ children: make([]storage.Appender, 0), - intercept: f.intercept, componentID: f.componentID, } for _, x := range f.children { @@ -70,7 +61,6 @@ var _ storage.Appender = (*appender)(nil) type appender struct { children []storage.Appender componentID string - intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) } // Append satisfies the Appender interface. @@ -78,22 +68,8 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo if ref == 0 { ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) } - newRef := ref - newLabels := l - newTimestamp := t - newValue := v - if a.intercept != nil { - var err error - newRef, newLabels, newTimestamp, newValue, err = a.intercept(ref, l, t, v) - if err != nil { - return 0, err - } - } for _, x := range a.children { - if newLabels == nil { - continue - } - _, _ = x.Append(newRef, newLabels, newTimestamp, newValue) + _, _ = x.Append(ref, l, t, v) } return ref, nil } diff --git a/component/prometheus/interceptor.go b/component/prometheus/interceptor.go new file mode 100644 index 000000000000..dca9c88341f5 --- /dev/null +++ b/component/prometheus/interceptor.go @@ -0,0 +1,116 @@ +package prometheus + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +// Intercept func allows interceptor owners to inject custom behavior. +type Intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) + +// Interceptor supports the concept of an appendable/appender that you can add a func to be called before +// the values are sent to the child appendable/append. +type Interceptor struct { + mut sync.RWMutex + // intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated. + // Intercept shouuld be thread safe and can be called across appenders. + intercept Intercept + // next is where to send the next command. + next storage.Appendable + + // ComponentID is what component this belongs to. + componentID string +} + +// NewInterceptor creates a interceptor appendable. +func NewInterceptor(inter Intercept, next storage.Appendable, componentID string) (*Interceptor, error) { + if inter == nil { + return nil, fmt.Errorf("intercept cannot be null for component %s", componentID) + } + return &Interceptor{ + intercept: inter, + next: next, + componentID: componentID, + }, nil +} + +// UpdateChild allows changing of the child of the interceptor. +func (f *Interceptor) UpdateChild(child storage.Appendable) { + f.mut.Lock() + defer f.mut.Unlock() + + f.next = child +} + +// Appender satisfies the Appendable interface. +func (f *Interceptor) Appender(ctx context.Context) storage.Appender { + f.mut.RLock() + defer f.mut.RUnlock() + + app := &interceptappender{ + intercept: f.intercept, + componentID: f.componentID, + } + if f.next != nil { + app.child = f.next.Appender(ctx) + } + return app +} + +var _ storage.Appender = (*appender)(nil) + +type interceptappender struct { + child storage.Appender + componentID string + intercept Intercept +} + +// Append satisfies the Appender interface. +func (a *interceptappender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + if ref == 0 { + ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) + } + return a.intercept(ref, l, t, v, a.child) +} + +// Commit satisfies the Appender interface. +func (a *interceptappender) Commit() error { + if a.child == nil { + return nil + } + return a.child.Commit() +} + +// Rollback satisifies the Appender interface. +func (a *interceptappender) Rollback() error { + if a.child == nil { + return nil + } + return a.child.Rollback() +} + +// AppendExemplar satisfies the Appender interface. +func (a *interceptappender) AppendExemplar( + ref storage.SeriesRef, + l labels.Labels, + e exemplar.Exemplar, +) (storage.SeriesRef, error) { + + return 0, fmt.Errorf("appendExemplar not supported yet") +} + +// UpdateMetadata satisifies the Appender interface. +func (a *interceptappender) UpdateMetadata( + ref storage.SeriesRef, + l labels.Labels, + m metadata.Metadata, +) (storage.SeriesRef, error) { + + return 0, fmt.Errorf("updateMetadata not supported yet") +} diff --git a/component/prometheus/relabel/relabel.go b/component/prometheus/relabel/relabel.go index ac4a127ede88..d928810010bc 100644 --- a/component/prometheus/relabel/relabel.go +++ b/component/prometheus/relabel/relabel.go @@ -47,8 +47,9 @@ type Component struct { mut sync.RWMutex opts component.Options mrc []*relabel.Config - receiver *prometheus.Fanout + receiver *prometheus.Interceptor metricsProcessed prometheus_client.Counter + fanout *prometheus.Fanout cacheMut sync.RWMutex cache map[uint64]*labelAndID @@ -74,11 +75,18 @@ func New(o component.Options, args Arguments) (*Component, error) { return nil, err } - c.receiver = prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + c.fanout = prometheus.NewFanout(args.ForwardTo, o.ID) + c.receiver, err = prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { newLbl := c.relabel(v, l) - return ref, newLbl, t, v, nil - }, args.ForwardTo, c.opts.ID) + if newLbl == nil { + return 0, nil + } + return next.Append(0, newLbl, t, v) + }, c.fanout, c.opts.ID) + if err != nil { + return nil, err + } // Immediately export the receiver which remains the same for the component // lifetime. o.OnStateChange(Exports{Receiver: c.receiver}) @@ -106,7 +114,7 @@ func (c *Component) Update(args component.Arguments) error { newArgs := args.(Arguments) c.clearCache() c.mrc = flow_relabel.ComponentToPromRelabelConfigs(newArgs.MetricRelabelConfigs) - c.receiver.UpdateChildren(newArgs.ForwardTo) + c.fanout.UpdateChildren(newArgs.ForwardTo) c.opts.OnStateChange(Exports{Receiver: c.receiver}) return nil diff --git a/component/prometheus/relabel/relabel_test.go b/component/prometheus/relabel/relabel_test.go index c880e8e13d01..afd879aee583 100644 --- a/component/prometheus/relabel/relabel_test.go +++ b/component/prometheus/relabel/relabel_test.go @@ -56,10 +56,11 @@ func TestUpdateReset(t *testing.T) { } func TestNil(t *testing.T) { - fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { require.True(t, false) - return ref, l, tt, v, nil + return ref, nil }, nil, "1") + require.NoError(t, err) relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), @@ -86,10 +87,11 @@ func TestNil(t *testing.T) { func BenchmarkCache(b *testing.B) { l := log.NewSyncLogger(log.NewLogfmtLogger(os.Stderr)) - fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { require.True(b, l.Has("new_label")) - return ref, l, tt, v, nil + return ref, nil }, nil, "1") + require.NoError(b, err) var entry storage.Appendable _, _ = New(component.Options{ ID: "1", @@ -122,10 +124,11 @@ func BenchmarkCache(b *testing.B) { } func generateRelabel(t *testing.T) *Component { - fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { + fanout, err := prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, tt int64, v float64, next storage.Appender) (storage.SeriesRef, error) { require.True(t, l.Has("new_label")) - return ref, l, tt, v, nil + return ref, nil }, nil, "1") + require.NoError(t, err) relabeller, err := New(component.Options{ ID: "1", Logger: util.TestLogger(t), diff --git a/component/prometheus/scrape/scrape.go b/component/prometheus/scrape/scrape.go index a4b75bdba9d1..fd17311756e0 100644 --- a/component/prometheus/scrape/scrape.go +++ b/component/prometheus/scrape/scrape.go @@ -119,7 +119,7 @@ var ( // New creates a new prometheus.scrape component. func New(o component.Options, args Arguments) (*Component, error) { - flowAppendable := prometheus.NewFanout(nil, args.ForwardTo, o.ID) + flowAppendable := prometheus.NewFanout(args.ForwardTo, o.ID) scrapeOptions := &scrape.Options{ExtraMetrics: args.ExtraMetrics} scraper := scrape.NewManager(scrapeOptions, o.Logger, flowAppendable) c := &Component{ diff --git a/component/prometheus/scrape/scrape_test.go b/component/prometheus/scrape/scrape_test.go index c94ccd991646..96f5bc7cd3aa 100644 --- a/component/prometheus/scrape/scrape_test.go +++ b/component/prometheus/scrape/scrape_test.go @@ -42,11 +42,16 @@ func TestForwardingToAppendable(t *testing.T) { // Update the component with a mock receiver; it should be passed along to the Appendable. var receivedTs int64 var receivedSamples labels.Labels - fanout := prometheus.NewFanout(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error) { - receivedTs = t - receivedSamples = l - return ref, l, t, v, nil - }, nil, "1") + fanout, err := prometheus.NewInterceptor( + func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + receivedTs = t + receivedSamples = l + return ref, nil + }, + nil, + "1", + ) + require.NoError(t, err) args.ForwardTo = []storage.Appendable{fanout} err = s.Update(args) require.NoError(t, err) From 05f1c250f3c2405acd7ad0916dd04fcdefc0ea7b Mon Sep 17 00:00:00 2001 From: Matt Durham Date: Tue, 1 Nov 2022 10:52:45 -0400 Subject: [PATCH 12/12] Simplify the remote_write and add multierror. --- component/prometheus/fanout.go | 10 ++- .../prometheus/remotewrite/appendable.go | 68 ------------------- .../prometheus/remotewrite/remote_write.go | 25 +++++-- 3 files changed, 28 insertions(+), 75 deletions(-) delete mode 100644 component/prometheus/remotewrite/appendable.go diff --git a/component/prometheus/fanout.go b/component/prometheus/fanout.go index 54163b77447b..47671d20c731 100644 --- a/component/prometheus/fanout.go +++ b/component/prometheus/fanout.go @@ -5,6 +5,8 @@ import ( "fmt" "sync" + "github.com/hashicorp/go-multierror" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" @@ -68,10 +70,14 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo if ref == 0 { ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l)) } + var multiErr error for _, x := range a.children { - _, _ = x.Append(ref, l, t, v) + _, err := x.Append(ref, l, t, v) + if err != nil { + multiErr = multierror.Append(multiErr, err) + } } - return ref, nil + return ref, multiErr } // Commit satisfies the Appender interface. diff --git a/component/prometheus/remotewrite/appendable.go b/component/prometheus/remotewrite/appendable.go deleted file mode 100644 index f324665df734..000000000000 --- a/component/prometheus/remotewrite/appendable.go +++ /dev/null @@ -1,68 +0,0 @@ -package remotewrite - -import ( - "fmt" - - "github.com/grafana/agent/component/prometheus" - "github.com/prometheus/prometheus/model/exemplar" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/metadata" - "github.com/prometheus/prometheus/storage" - "golang.org/x/net/context" -) - -var _ storage.Appendable = (*appendable)(nil) -var _ storage.Appender = (*appender)(nil) - -type appendable struct { - inner storage.Appendable - componentID string -} - -// Appender satisfies the Appendable interface. -func (a *appendable) Appender(ctx context.Context) storage.Appender { - app := &appender{ - child: a.inner.Appender(ctx), - componentID: a.componentID, - } - return app -} - -type appender struct { - child storage.Appender - componentID string -} - -// Append satisfies the Appender interface. -func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { - // Conversion is needed because remote_writes assume they own all the IDs, so if you have two remote_writes they will - // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id - // to a local (remote_write) id. - localID := prometheus.GlobalRefMapping.GetLocalRefID(a.componentID, uint64(ref)) - newref, err := a.child.Append(storage.SeriesRef(localID), l, t, v) - // If there was no local id we need to propagate it. - if localID == 0 { - prometheus.GlobalRefMapping.GetOrAddLink(a.componentID, uint64(newref), l) - } - return ref, err -} - -// Commit satisfies the Appender interface. -func (a *appender) Commit() error { - return a.child.Commit() -} - -// Rollback satisfies the Appender interface. -func (a *appender) Rollback() error { - return a.child.Rollback() -} - -// AppendExemplar satisfies the Appender interface. -func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { - return 0, fmt.Errorf("appendExemplar not supported yet") -} - -// UpdateMetadata satisfies the Appender interface. -func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { - return 0, fmt.Errorf("updateMetadata not supported yet") -} diff --git a/component/prometheus/remotewrite/remote_write.go b/component/prometheus/remotewrite/remote_write.go index a08648194c6a..db879d17f5e9 100644 --- a/component/prometheus/remotewrite/remote_write.go +++ b/component/prometheus/remotewrite/remote_write.go @@ -8,6 +8,10 @@ import ( "sync" "time" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/agent/component/prometheus" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/agent/component" @@ -48,7 +52,7 @@ type Component struct { mut sync.RWMutex cfg Arguments - receiver *appendable + receiver *prometheus.Interceptor } // NewComponent creates a new prometheus.remote_write component. @@ -70,11 +74,22 @@ func NewComponent(o component.Options, c Arguments) (*Component, error) { remoteStore: remoteStore, storage: storage.NewFanout(o.Logger, walStorage, remoteStore), } - res.receiver = &appendable{ - inner: res.storage, - componentID: o.ID, + res.receiver, err = prometheus.NewInterceptor(func(ref storage.SeriesRef, l labels.Labels, t int64, v float64, next storage.Appender) (storage.SeriesRef, error) { + // Conversion is needed because remote_writes assume they own all the IDs, so if you have two remote_writes they will + // both assume they have only one scraper attached. In flow that is not true, so we need to translate from a global id + // to a local (remote_write) id. + localID := prometheus.GlobalRefMapping.GetLocalRefID(res.opts.ID, uint64(ref)) + newref, nextErr := next.Append(storage.SeriesRef(localID), l, t, v) + // If there was no local id we need to propagate it. + if localID == 0 { + prometheus.GlobalRefMapping.GetOrAddLink(res.opts.ID, uint64(newref), l) + } + // return the original ref since this needs to be a globalref id and not the local one. + return ref, nextErr + }, res.storage, o.ID) + if err != nil { + return nil, err } - // Immediately export the receiver which remains the same for the component // lifetime. o.OnStateChange(Exports{Receiver: res.receiver})