Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert to using prometheus appendable #2431

Merged
merged 13 commits into from
Nov 1, 2022
111 changes: 0 additions & 111 deletions component/common/appendable/appendable.go

This file was deleted.

132 changes: 132 additions & 0 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package prometheus

import (
"context"
"sync"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"

"github.com/prometheus/prometheus/storage"
)

var _ storage.Appendable = (*Fanout)(nil)

type intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error)

// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
type Fanout struct {
mut sync.RWMutex
// intercept allows one to intercept the series before it fans out to make any changes. If labels.Labels returns nil the series is not propagated.
// Intercept shouuld be thread safe and can be called across appenders.
intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error)

mattdurham marked this conversation as resolved.
Show resolved Hide resolved
// children is where to fan out.
children []storage.Appendable

// ComponentID is what component this belongs to.
componentID string
}

// NewFanout creates a fanout appendable.
func NewFanout(inter intercept, children []storage.Appendable, componentID string) *Fanout {
return &Fanout{
intercept: inter,
children: children,
componentID: componentID,
}
}

// UpdateChildren allows changing of the children of the fanout.
func (f *Fanout) UpdateChildren(children []storage.Appendable) {
f.mut.Lock()
defer f.mut.Unlock()
f.children = children
}

// Appender satisfies the Appendable interface.
func (f *Fanout) Appender(ctx context.Context) storage.Appender {
f.mut.RLock()
defer f.mut.RUnlock()

app := &appender{
children: make([]storage.Appender, len(f.children)),
intercept: f.intercept,
componentID: f.componentID,
}
for i, x := range f.children {
if x == nil {
continue
}
app.children[i] = x.Appender(ctx)
}
rfratto marked this conversation as resolved.
Show resolved Hide resolved
return app
}

var _ storage.Appender = (*appender)(nil)

type appender struct {
children []storage.Appender
componentID string
intercept func(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, labels.Labels, int64, float64, error)
}

// Append satisfies the Appender interface.
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
}
nRef := ref
nL := l
nT := t
nV := v
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
if a.intercept != nil {
var err error
nRef, nL, nT, nV, err = a.intercept(ref, l, t, v)
if err != nil {
return 0, err
}
}
for _, x := range a.children {
if x == nil || nL == nil {
continue
}
_, _ = x.Append(nRef, nL, nT, nV)
}
return ref, nil
}

// Commit satisfies the Appender interface.
func (a *appender) Commit() error {
for _, x := range a.children {
if x == nil {
continue
}
_ = x.Commit()
}
return nil
}

// Rollback satisifies the Appender interface.
func (a *appender) Rollback() error {
for _, x := range a.children {
if x == nil {
continue
}
_, _ = x, a.Rollback()
}
return nil
}

// AppendExemplar satisfies the Appender interface.
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
// TODO implement me
panic("implement me")
}

// UpdateMetadata satisifies the Appender interface.
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
// TODO implement me
panic("implement me")
}
mattdurham marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 6 additions & 2 deletions component/prometheus/globalrefmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newGlobalRefMap() *GlobalRefMap {
}

// GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id.
func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *FlowMetric) uint64 {
func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 {
g.mut.Lock()
defer g.mut.Unlock()

Expand All @@ -59,7 +59,7 @@ func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *F
g.mappings[componentID] = m
}

labelHash := fm.labels.Hash()
labelHash := lbls.Hash()
globalID, found := g.labelsHashToGlobal[labelHash]
if found {
m.localToGlobal[localRefID] = globalID
Expand All @@ -79,6 +79,10 @@ func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 {
g.mut.Lock()
defer g.mut.Unlock()

if l == nil {
return 0
}
rfratto marked this conversation as resolved.
Show resolved Hide resolved

labelHash := l.Hash()
globalID, found := g.labelsHashToGlobal[labelHash]
if found {
Expand Down
25 changes: 8 additions & 17 deletions component/prometheus/globalrefmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func TestAddingLocalMapping(t *testing.T) {
})

globalID := mapping.GetOrAddGlobalRefID(l)
fm := NewFlowMetric(globalID, l, 0)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
require.True(t, globalID == shouldBeSameGlobalID)
require.Len(t, mapping.labelsHashToGlobal, 1)
require.Len(t, mapping.mappings, 1)
Expand All @@ -67,9 +66,8 @@ func TestAddingLocalMappings(t *testing.T) {
})

globalID := mapping.GetOrAddGlobalRefID(l)
fm := NewFlowMetric(globalID, l, 0)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l)
require.True(t, globalID == shouldBeSameGlobalID)
require.True(t, globalID == shouldBeSameGlobalID2)
require.Len(t, mapping.labelsHashToGlobal, 1)
Expand All @@ -92,10 +90,8 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) {
Value: "test",
})

fm := NewFlowMetric(1, l, 0)

shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l)
require.True(t, shouldBeSameGlobalID2 == shouldBeSameGlobalID)
require.Len(t, mapping.labelsHashToGlobal, 1)
require.Len(t, mapping.mappings, 2)
Expand All @@ -122,11 +118,8 @@ func TestStaleness(t *testing.T) {
Value: "test2",
})

fm := NewFlowMetric(0, l, 0)
fm2 := NewFlowMetric(0, l2, 0)

global1 := mapping.GetOrAddLink("1", 1, fm)
_ = mapping.GetOrAddLink("2", 1, fm2)
global1 := mapping.GetOrAddLink("1", 1, l)
_ = mapping.GetOrAddLink("2", 1, l2)
mapping.AddStaleMarker(global1, l)
require.Len(t, mapping.staleGlobals, 1)
require.Len(t, mapping.labelsHashToGlobal, 2)
Expand All @@ -145,9 +138,7 @@ func TestRemovingStaleness(t *testing.T) {
Value: "test",
})

fm := NewFlowMetric(0, l, 0)

global1 := mapping.GetOrAddLink("1", 1, fm)
global1 := mapping.GetOrAddLink("1", 1, l)
mapping.AddStaleMarker(global1, l)
require.Len(t, mapping.staleGlobals, 1)
mapping.RemoveStaleMarker(global1)
Expand Down
64 changes: 0 additions & 64 deletions component/prometheus/receiver.go

This file was deleted.

Loading