Skip to content

Commit

Permalink
Convert to using prometheus appendable (#2431)
Browse files Browse the repository at this point in the history
* initial commit of switching over to using appendable

* comments

* simplify fanout

* Switch to using a common fanout appendable.

* Merge changes

* Fix lint

* Add check

* remove panics

* PR feedback

* fix error

* separate fanout and intercept patterns (#2447)

* separate fanout and intercept patterns

* testing flow

* Add middleware style next to interceptor.

* Simplify the remote_write and add multierror.
  • Loading branch information
mattdurham authored Nov 1, 2022
1 parent 37d333a commit 0513789
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 430 deletions.
111 changes: 0 additions & 111 deletions component/common/appendable/appendable.go

This file was deleted.

107 changes: 107 additions & 0 deletions component/prometheus/fanout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package prometheus

import (
"context"
"fmt"
"sync"

"github.com/hashicorp/go-multierror"

"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"

"github.com/prometheus/prometheus/storage"
)

var _ storage.Appendable = (*Fanout)(nil)

// Fanout supports the default Flow style of appendables since it can go to multiple outputs. It also allows the intercepting of appends.
type Fanout struct {
mut sync.RWMutex
// children is where to fan out.
children []storage.Appendable
// ComponentID is what component this belongs to.
componentID string
}

// NewFanout creates a fanout appendable.
func NewFanout(children []storage.Appendable, componentID string) *Fanout {
return &Fanout{
children: children,
componentID: componentID,
}
}

// UpdateChildren allows changing of the children of the fanout.
func (f *Fanout) UpdateChildren(children []storage.Appendable) {
f.mut.Lock()
defer f.mut.Unlock()
f.children = children
}

// Appender satisfies the Appendable interface.
func (f *Fanout) Appender(ctx context.Context) storage.Appender {
f.mut.RLock()
defer f.mut.RUnlock()

app := &appender{
children: make([]storage.Appender, 0),
componentID: f.componentID,
}
for _, x := range f.children {
if x == nil {
continue
}
app.children = append(app.children, x.Appender(ctx))
}
return app
}

var _ storage.Appender = (*appender)(nil)

type appender struct {
children []storage.Appender
componentID string
}

// Append satisfies the Appender interface.
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if ref == 0 {
ref = storage.SeriesRef(GlobalRefMapping.GetOrAddGlobalRefID(l))
}
var multiErr error
for _, x := range a.children {
_, err := x.Append(ref, l, t, v)
if err != nil {
multiErr = multierror.Append(multiErr, err)
}
}
return ref, multiErr
}

// Commit satisfies the Appender interface.
func (a *appender) Commit() error {
for _, x := range a.children {
_ = x.Commit()
}
return nil
}

// Rollback satisifies the Appender interface.
func (a *appender) Rollback() error {
for _, x := range a.children {
_, _ = x, a.Rollback()
}
return nil
}

// AppendExemplar satisfies the Appender interface.
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, fmt.Errorf("appendExemplar not supported yet")
}

// UpdateMetadata satisifies the Appender interface.
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, fmt.Errorf("updateMetadata not supported yet")
}
9 changes: 7 additions & 2 deletions component/prometheus/globalrefmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newGlobalRefMap() *GlobalRefMap {
}

// GetOrAddLink is called by a remote_write endpoint component to add mapping and get back the global id.
func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *FlowMetric) uint64 {
func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, lbls labels.Labels) uint64 {
g.mut.Lock()
defer g.mut.Unlock()

Expand All @@ -59,7 +59,7 @@ func (g *GlobalRefMap) GetOrAddLink(componentID string, localRefID uint64, fm *F
g.mappings[componentID] = m
}

labelHash := fm.labels.Hash()
labelHash := lbls.Hash()
globalID, found := g.labelsHashToGlobal[labelHash]
if found {
m.localToGlobal[localRefID] = globalID
Expand All @@ -79,6 +79,11 @@ func (g *GlobalRefMap) GetOrAddGlobalRefID(l labels.Labels) uint64 {
g.mut.Lock()
defer g.mut.Unlock()

// Guard against bad input.
if l == nil {
return 0
}

labelHash := l.Hash()
globalID, found := g.labelsHashToGlobal[labelHash]
if found {
Expand Down
25 changes: 8 additions & 17 deletions component/prometheus/globalrefmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ func TestAddingLocalMapping(t *testing.T) {
})

globalID := mapping.GetOrAddGlobalRefID(l)
fm := NewFlowMetric(globalID, l, 0)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
require.True(t, globalID == shouldBeSameGlobalID)
require.Len(t, mapping.labelsHashToGlobal, 1)
require.Len(t, mapping.mappings, 1)
Expand All @@ -67,9 +66,8 @@ func TestAddingLocalMappings(t *testing.T) {
})

globalID := mapping.GetOrAddGlobalRefID(l)
fm := NewFlowMetric(globalID, l, 0)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l)
require.True(t, globalID == shouldBeSameGlobalID)
require.True(t, globalID == shouldBeSameGlobalID2)
require.Len(t, mapping.labelsHashToGlobal, 1)
Expand All @@ -92,10 +90,8 @@ func TestAddingLocalMappingsWithoutCreatingGlobalUpfront(t *testing.T) {
Value: "test",
})

fm := NewFlowMetric(1, l, 0)

shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, fm)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, fm)
shouldBeSameGlobalID := mapping.GetOrAddLink("1", 1, l)
shouldBeSameGlobalID2 := mapping.GetOrAddLink("2", 1, l)
require.True(t, shouldBeSameGlobalID2 == shouldBeSameGlobalID)
require.Len(t, mapping.labelsHashToGlobal, 1)
require.Len(t, mapping.mappings, 2)
Expand All @@ -122,11 +118,8 @@ func TestStaleness(t *testing.T) {
Value: "test2",
})

fm := NewFlowMetric(0, l, 0)
fm2 := NewFlowMetric(0, l2, 0)

global1 := mapping.GetOrAddLink("1", 1, fm)
_ = mapping.GetOrAddLink("2", 1, fm2)
global1 := mapping.GetOrAddLink("1", 1, l)
_ = mapping.GetOrAddLink("2", 1, l2)
mapping.AddStaleMarker(global1, l)
require.Len(t, mapping.staleGlobals, 1)
require.Len(t, mapping.labelsHashToGlobal, 2)
Expand All @@ -145,9 +138,7 @@ func TestRemovingStaleness(t *testing.T) {
Value: "test",
})

fm := NewFlowMetric(0, l, 0)

global1 := mapping.GetOrAddLink("1", 1, fm)
global1 := mapping.GetOrAddLink("1", 1, l)
mapping.AddStaleMarker(global1, l)
require.Len(t, mapping.staleGlobals, 1)
mapping.RemoveStaleMarker(global1)
Expand Down
Loading

0 comments on commit 0513789

Please sign in to comment.