Skip to content

Commit

Permalink
pkg/ingester: handle labels mapping to the same fast fingerprint.
Browse files Browse the repository at this point in the history
Uses slightly adapted fpMapper code from Cortex.

Fixes issue #898

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Nov 12, 2019
1 parent 3787596 commit b3d13c7
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 21 deletions.
9 changes: 4 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/chunkenc"
loki_util "github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -206,7 +205,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
return nil
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelAdapter) {
func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

Expand Down Expand Up @@ -261,19 +260,19 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.Dec()
}
}

func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelAdapter, cs []*chunkDesc) error {
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}

labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(labelPairs))
labelsBuilder := labels.NewBuilder(labelPairs)
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

Expand Down
32 changes: 23 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type instance struct {
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream
index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

instanceID string

Expand All @@ -66,7 +67,7 @@ type instance struct {
}

func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
return &instance{
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
Expand All @@ -78,6 +79,8 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides)
tailers: map[uint32]*tailer{},
limits: limits,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
}

// consumeChunk manually adds a chunk that was received during ingester chunk
Expand All @@ -86,11 +89,13 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

fp := client.FastFingerprint(labels)
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
Expand Down Expand Up @@ -136,7 +141,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
fp := client.FastFingerprint(labels)
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
if ok {
Expand All @@ -146,8 +152,8 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
}
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
Expand All @@ -156,6 +162,15 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
s := i.streams[fp]
if s == nil {
return nil
}
return s.labels
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
Expand Down Expand Up @@ -210,9 +225,8 @@ outer:
if !ok {
return nil, ErrStreamMissing
}
lbs := client.FromLabelAdaptersToLabels(stream.labels)
for _, filter := range filters {
if !filter.Matches(lbs.Get(filter.Name)) {
if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer
}
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ingester

import (
"context"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"

"github.com/grafana/loki/pkg/util/validation"
"github.com/stretchr/testify/require"
)

func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

i := newInstance("test", 512, o)

tt := time.Now().Add(-5 * time.Minute)

// Notice how labels aren't sorted.
err = i.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{
// both label sets have FastFingerprint=e002a3a451262627
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"1\"}", Entries: entries(tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",app=\"m\",uniq1=\"1\"}", Entries: entries(tt)},

// e002a3a451262247
{Labels: "{app=\"l\",uniq0=\"1\",uniq1=\"0\"}", Entries: entries(tt.Add(time.Minute))},
{Labels: "{uniq1=\"0\",app=\"m\",uniq0=\"0\"}", Entries: entries(tt)},

// e002a2a4512624f4
{Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"0\"}", Entries: entries(tt.Add(time.Minute))},
{Labels: "{uniq0=\"1\",uniq1=\"0\",app=\"m\"}", Entries: entries(tt)},
}})
require.NoError(t, err)
}

func entries(time time.Time) []logproto.Entry {
return []logproto.Entry{{Timestamp: time, Line: "hello"}}
}
187 changes: 187 additions & 0 deletions pkg/ingester/mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package ingester

import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/ingester/client"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
)

const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.

var separatorString = string([]byte{model.SeparatorByte})

// fpMapper is used to map fingerprints in order to work around fingerprint
// collisions.
type fpMapper struct {
// highestMappedFP has to be aligned for atomic operations.
highestMappedFP model.Fingerprint

mtx sync.RWMutex // Protects mappings.
// maps original fingerprints to a map of string representations of
// metrics to the truly unique fingerprint.
mappings map[model.Fingerprint]map[string]model.Fingerprint

// Returns existing labels for given fingerprint, if any.
// Equality check relies on labels.Labels being sorted.
fpToLabels func(fingerprint model.Fingerprint) labels.Labels
}

// newFPMapper returns an fpMapper ready to use.
func newFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *fpMapper {
if fpToLabels == nil {
panic("nil fpToLabels")
}

return &fpMapper{
fpToLabels: fpToLabels,
mappings: map[model.Fingerprint]map[string]model.Fingerprint{},
}
}

// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) model.Fingerprint {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
return m.maybeAddMapping(fp, metric)
}

// Then check the most likely case: This fp belongs to a series that is
// already in memory.
s := m.fpToLabels(fp)
if s != nil {
// FP exists in memory, but is it for the same metric?
if equalLabels(metric, s) {
// Yupp. We are done.
return fp
}
// Collision detected!
return m.maybeAddMapping(fp, metric)
}
// Metric is not in memory. Before doing the expensive archive lookup,
// check if we have a mapping for this metric in place already.
m.mtx.RLock()
mappedFPs, fpAlreadyMapped := m.mappings[fp]
m.mtx.RUnlock()
if fpAlreadyMapped {
// We indeed have mapped fp historically.
ms := metricToUniqueString(metric)
// fp is locked by the caller, so no further locking of
// 'collisions' required (it is specific to fp).
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP
}
}
return fp
}

func valueForName(s labels.Labels, name string) (string, bool) {
pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name })
if pos == len(s) || s[pos].Name != name {
return "", false
}
return s[pos].Value, true
}

// Check if a and b contain the same name/value pairs
func equalLabels(a []client.LabelAdapter, b labels.Labels) bool {
if len(a) != len(b) {
return false
}
// Check as many as we can where the two sets are in the same order
i := 0
for ; i < len(a); i++ {
if b[i].Name != a[i].Name {
break
}
if b[i].Value != a[i].Value {
return false
}
}
// Now check remaining values using binary search
for ; i < len(a); i++ {
v, found := valueForName(b, a[i].Name)
if !found || v != a[i].Value {
return false
}
}
return true
}

// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []client.LabelAdapter) model.Fingerprint {
ms := metricToUniqueString(collidingMetric)
m.mtx.RLock()
mappedFPs, ok := m.mappings[fp]
m.mtx.RUnlock()
if ok {
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP // Existing mapping.
}
// A new mapping has to be created.
mappedFP = m.nextMappedFP()
mappedFPs[ms] = mappedFP
level.Info(util.Logger).Log(
"msg", "fingerprint collision detected, mapping to new fingerprint",
"old_fp", fp,
"new_fp", mappedFP,
"metric", collidingMetric,
)
return mappedFP
}
// This is the first collision for fp.
mappedFP := m.nextMappedFP()
mappedFPs = map[string]model.Fingerprint{ms: mappedFP}
m.mtx.Lock()
m.mappings[fp] = mappedFPs
m.mtx.Unlock()
level.Info(util.Logger).Log(
"msg", "fingerprint collision detected, mapping to new fingerprint",
"old_fp", fp,
"new_fp", mappedFP,
"metric", collidingMetric,
)
return mappedFP
}

func (m *fpMapper) nextMappedFP() model.Fingerprint {
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
return mappedFP
}

// metricToUniqueString turns a metric into a string in a reproducible and
// unique way, i.e. the same metric will always create the same string, and
// different metrics will always create different strings. In a way, it is the
// "ideal" fingerprint function, only that it is more expensive than the
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m []client.LabelAdapter) string {
parts := make([]string, 0, len(m))
for _, pair := range m {
parts = append(parts, pair.Name+separatorString+pair.Value)
}
sort.Strings(parts)
return strings.Join(parts, separatorString)
}
Loading

0 comments on commit b3d13c7

Please sign in to comment.