From b3d13c79243f0052986ecc2dd256fc9ad10ad845 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 12 Nov 2019 14:15:20 +0100 Subject: [PATCH] pkg/ingester: handle labels mapping to the same fast fingerprint. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Uses slightly adapted fpMapper code from Cortex. Fixes issue #898 Signed-off-by: Peter Štibraný --- pkg/ingester/flush.go | 9 +- pkg/ingester/instance.go | 32 ++++-- pkg/ingester/instance_test.go | 41 ++++++++ pkg/ingester/mapper.go | 187 ++++++++++++++++++++++++++++++++++ pkg/ingester/mapper_test.go | 137 +++++++++++++++++++++++++ pkg/ingester/stream.go | 17 ++-- 6 files changed, 402 insertions(+), 21 deletions(-) create mode 100644 pkg/ingester/instance_test.go create mode 100644 pkg/ingester/mapper.go create mode 100644 pkg/ingester/mapper_test.go diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 370c86ec4dcf..6d03051009bc 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -15,7 +15,6 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" "github.com/grafana/loki/pkg/chunkenc" loki_util "github.com/grafana/loki/pkg/util" @@ -206,7 +205,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat return nil } -func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelAdapter) { +func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) { instance.streamsMtx.Lock() defer instance.streamsMtx.Unlock() @@ -261,19 +260,19 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { if len(stream.chunks) == 0 { delete(instance.streams, stream.fp) - instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp) + instance.index.Delete(stream.labels, stream.fp) instance.streamsRemovedTotal.Inc() memoryStreams.Dec() } } -func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelAdapter, cs []*chunkDesc) error { +func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error { userID, err := user.ExtractOrgID(ctx) if err != nil { return err } - labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(labelPairs)) + labelsBuilder := labels.NewBuilder(labelPairs) labelsBuilder.Set(nameLabel, logsValue) metric := labelsBuilder.Labels() diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index a10fe6e9a4e2..7d7949b5a96e 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -52,6 +52,7 @@ type instance struct { streamsMtx sync.RWMutex streams map[model.Fingerprint]*stream index *index.InvertedIndex + mapper *fpMapper // using of mapper needs streamsMtx because it calls back instanceID string @@ -66,7 +67,7 @@ type instance struct { } func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance { - return &instance{ + i := &instance{ streams: map[model.Fingerprint]*stream{}, index: index.New(), instanceID: instanceID, @@ -78,6 +79,8 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides) tailers: map[uint32]*tailer{}, limits: limits, } + i.mapper = newFPMapper(i.getLabelsFromFingerprint) + return i } // consumeChunk manually adds a chunk that was received during ingester chunk @@ -86,11 +89,13 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte i.streamsMtx.Lock() defer i.streamsMtx.Unlock() - fp := client.FastFingerprint(labels) + rawFp := client.FastFingerprint(labels) + fp := i.mapper.mapFP(rawFp, labels) + stream, ok := i.streams[fp] if !ok { - stream = newStream(fp, labels, i.blockSize) - i.index.Add(labels, fp) + sortedLabels := i.index.Add(labels, fp) + stream = newStream(fp, sortedLabels, i.blockSize) i.streams[fp] = stream i.streamsCreatedTotal.Inc() memoryStreams.Inc() @@ -136,7 +141,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { } func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) { - fp := client.FastFingerprint(labels) + rawFp := client.FastFingerprint(labels) + fp := i.mapper.mapFP(rawFp, labels) stream, ok := i.streams[fp] if ok { @@ -146,8 +152,8 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) { return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID)) } - stream = newStream(fp, labels, i.blockSize) - i.index.Add(labels, fp) + sortedLabels := i.index.Add(labels, fp) + stream = newStream(fp, sortedLabels, i.blockSize) i.streams[fp] = stream memoryStreams.Inc() i.streamsCreatedTotal.Inc() @@ -156,6 +162,15 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err return stream, nil } +// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx. +func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels { + s := i.streams[fp] + if s == nil { + return nil + } + return s.labels +} + func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector() if err != nil { @@ -210,9 +225,8 @@ outer: if !ok { return nil, ErrStreamMissing } - lbs := client.FromLabelAdaptersToLabels(stream.labels) for _, filter := range filters { - if !filter.Matches(lbs.Get(filter.Name)) { + if !filter.Matches(stream.labels.Get(filter.Name)) { continue outer } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go new file mode 100644 index 000000000000..fbb615ba1c74 --- /dev/null +++ b/pkg/ingester/instance_test.go @@ -0,0 +1,41 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + + "github.com/grafana/loki/pkg/util/validation" + "github.com/stretchr/testify/require" +) + +func TestLabelsCollisions(t *testing.T) { + o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000}) + require.NoError(t, err) + + i := newInstance("test", 512, o) + + tt := time.Now().Add(-5 * time.Minute) + + // Notice how labels aren't sorted. + err = i.Push(context.Background(), &logproto.PushRequest{Streams: []*logproto.Stream{ + // both label sets have FastFingerprint=e002a3a451262627 + {Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"1\"}", Entries: entries(tt.Add(time.Minute))}, + {Labels: "{uniq0=\"1\",app=\"m\",uniq1=\"1\"}", Entries: entries(tt)}, + + // e002a3a451262247 + {Labels: "{app=\"l\",uniq0=\"1\",uniq1=\"0\"}", Entries: entries(tt.Add(time.Minute))}, + {Labels: "{uniq1=\"0\",app=\"m\",uniq0=\"0\"}", Entries: entries(tt)}, + + // e002a2a4512624f4 + {Labels: "{app=\"l\",uniq0=\"0\",uniq1=\"0\"}", Entries: entries(tt.Add(time.Minute))}, + {Labels: "{uniq0=\"1\",uniq1=\"0\",app=\"m\"}", Entries: entries(tt)}, + }}) + require.NoError(t, err) +} + +func entries(time time.Time) []logproto.Entry { + return []logproto.Entry{{Timestamp: time, Line: "hello"}} +} diff --git a/pkg/ingester/mapper.go b/pkg/ingester/mapper.go new file mode 100644 index 000000000000..5510ac614f84 --- /dev/null +++ b/pkg/ingester/mapper.go @@ -0,0 +1,187 @@ +package ingester + +import ( + "fmt" + "sort" + "strings" + "sync" + "sync/atomic" + + "github.com/prometheus/prometheus/pkg/labels" + + "github.com/cortexproject/cortex/pkg/ingester/client" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" +) + +const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping. + +var separatorString = string([]byte{model.SeparatorByte}) + +// fpMapper is used to map fingerprints in order to work around fingerprint +// collisions. +type fpMapper struct { + // highestMappedFP has to be aligned for atomic operations. + highestMappedFP model.Fingerprint + + mtx sync.RWMutex // Protects mappings. + // maps original fingerprints to a map of string representations of + // metrics to the truly unique fingerprint. + mappings map[model.Fingerprint]map[string]model.Fingerprint + + // Returns existing labels for given fingerprint, if any. + // Equality check relies on labels.Labels being sorted. + fpToLabels func(fingerprint model.Fingerprint) labels.Labels +} + +// newFPMapper returns an fpMapper ready to use. +func newFPMapper(fpToLabels func(fingerprint model.Fingerprint) labels.Labels) *fpMapper { + if fpToLabels == nil { + panic("nil fpToLabels") + } + + return &fpMapper{ + fpToLabels: fpToLabels, + mappings: map[model.Fingerprint]map[string]model.Fingerprint{}, + } +} + +// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and +// returns a truly unique fingerprint. The caller must have locked the raw +// fingerprint. +func (m *fpMapper) mapFP(fp model.Fingerprint, metric []client.LabelAdapter) model.Fingerprint { + // First check if we are in the reserved FP space, in which case this is + // automatically a collision that has to be mapped. + if fp <= maxMappedFP { + return m.maybeAddMapping(fp, metric) + } + + // Then check the most likely case: This fp belongs to a series that is + // already in memory. + s := m.fpToLabels(fp) + if s != nil { + // FP exists in memory, but is it for the same metric? + if equalLabels(metric, s) { + // Yupp. We are done. + return fp + } + // Collision detected! + return m.maybeAddMapping(fp, metric) + } + // Metric is not in memory. Before doing the expensive archive lookup, + // check if we have a mapping for this metric in place already. + m.mtx.RLock() + mappedFPs, fpAlreadyMapped := m.mappings[fp] + m.mtx.RUnlock() + if fpAlreadyMapped { + // We indeed have mapped fp historically. + ms := metricToUniqueString(metric) + // fp is locked by the caller, so no further locking of + // 'collisions' required (it is specific to fp). + mappedFP, ok := mappedFPs[ms] + if ok { + // Historical mapping found, return the mapped FP. + return mappedFP + } + } + return fp +} + +func valueForName(s labels.Labels, name string) (string, bool) { + pos := sort.Search(len(s), func(i int) bool { return s[i].Name >= name }) + if pos == len(s) || s[pos].Name != name { + return "", false + } + return s[pos].Value, true +} + +// Check if a and b contain the same name/value pairs +func equalLabels(a []client.LabelAdapter, b labels.Labels) bool { + if len(a) != len(b) { + return false + } + // Check as many as we can where the two sets are in the same order + i := 0 + for ; i < len(a); i++ { + if b[i].Name != a[i].Name { + break + } + if b[i].Value != a[i].Value { + return false + } + } + // Now check remaining values using binary search + for ; i < len(a); i++ { + v, found := valueForName(b, a[i].Name) + if !found || v != a[i].Value { + return false + } + } + return true +} + +// maybeAddMapping is only used internally. It takes a detected collision and +// adds it to the collisions map if not yet there. In any case, it returns the +// truly unique fingerprint for the colliding metric. +func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []client.LabelAdapter) model.Fingerprint { + ms := metricToUniqueString(collidingMetric) + m.mtx.RLock() + mappedFPs, ok := m.mappings[fp] + m.mtx.RUnlock() + if ok { + // fp is locked by the caller, so no further locking required. + mappedFP, ok := mappedFPs[ms] + if ok { + return mappedFP // Existing mapping. + } + // A new mapping has to be created. + mappedFP = m.nextMappedFP() + mappedFPs[ms] = mappedFP + level.Info(util.Logger).Log( + "msg", "fingerprint collision detected, mapping to new fingerprint", + "old_fp", fp, + "new_fp", mappedFP, + "metric", collidingMetric, + ) + return mappedFP + } + // This is the first collision for fp. + mappedFP := m.nextMappedFP() + mappedFPs = map[string]model.Fingerprint{ms: mappedFP} + m.mtx.Lock() + m.mappings[fp] = mappedFPs + m.mtx.Unlock() + level.Info(util.Logger).Log( + "msg", "fingerprint collision detected, mapping to new fingerprint", + "old_fp", fp, + "new_fp", mappedFP, + "metric", collidingMetric, + ) + return mappedFP +} + +func (m *fpMapper) nextMappedFP() model.Fingerprint { + mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1)) + if mappedFP > maxMappedFP { + panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) + } + return mappedFP +} + +// metricToUniqueString turns a metric into a string in a reproducible and +// unique way, i.e. the same metric will always create the same string, and +// different metrics will always create different strings. In a way, it is the +// "ideal" fingerprint function, only that it is more expensive than the +// FastFingerprint function, and its result is not suitable as a key for maps +// and indexes as it might become really large, causing a lot of hashing effort +// in maps and a lot of storage overhead in indexes. +func metricToUniqueString(m []client.LabelAdapter) string { + parts := make([]string, 0, len(m)) + for _, pair := range m { + parts = append(parts, pair.Name+separatorString+pair.Value) + } + sort.Strings(parts) + return strings.Join(parts, separatorString) +} diff --git a/pkg/ingester/mapper_test.go b/pkg/ingester/mapper_test.go new file mode 100644 index 000000000000..1662029e06f7 --- /dev/null +++ b/pkg/ingester/mapper_test.go @@ -0,0 +1,137 @@ +package ingester + +import ( + "sort" + "testing" + + "github.com/cortexproject/cortex/pkg/ingester/client" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" +) + +var ( + // cm11, cm12, cm13 are colliding with fp1. + // cm21, cm22 are colliding with fp2. + // cm31, cm32 are colliding with fp3, which is below maxMappedFP. + // Note that fingerprints are set and not actually calculated. + // The collision detection is independent from the actually used + // fingerprinting algorithm. + fp1 = model.Fingerprint(maxMappedFP + 1) + fp2 = model.Fingerprint(maxMappedFP + 2) + fp3 = model.Fingerprint(1) + cm11 = []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, + {Name: "dings", Value: "bumms"}, + } + cm12 = []client.LabelAdapter{ + {Name: "bar", Value: "foo"}, + } + cm13 = []client.LabelAdapter{ + {Name: "foo", Value: "bar"}, + } + cm21 = []client.LabelAdapter{ + {Name: "foo", Value: "bumms"}, + {Name: "dings", Value: "bar"}, + } + cm22 = []client.LabelAdapter{ + {Name: "dings", Value: "foo"}, + {Name: "bar", Value: "bumms"}, + } + cm31 = []client.LabelAdapter{ + {Name: "bumms", Value: "dings"}, + } + cm32 = []client.LabelAdapter{ + {Name: "bumms", Value: "dings"}, + {Name: "bar", Value: "foo"}, + } +) + +func copyValuesAndSort(a []client.LabelAdapter) labels.Labels { + c := make(labels.Labels, len(a)) + for i, pair := range a { + c[i].Name = pair.Name + c[i].Value = pair.Value + } + sort.Sort(c) + return c +} + +func TestFPMapper(t *testing.T) { + sm := map[model.Fingerprint]labels.Labels{} + + mapper := newFPMapper(func(fp model.Fingerprint) labels.Labels { + return sm[fp] + }) + + // Everything is empty, resolving a FP should do nothing. + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), fp1) + + // cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve + // the collision. + sm[fp1] = copyValuesAndSort(cm11) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) + + // The mapped cm12 is added to sm, too. That should not change the outcome. + sm[model.Fingerprint(1)] = copyValuesAndSort(cm12) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) + + // Now map cm13, should reproducibly result in the next mapped FP. + assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) + + // Add cm13 to sm. Should not change anything. + sm[model.Fingerprint(2)] = copyValuesAndSort(cm13) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) + + // Now add cm21 and cm22 in the same way, checking the mapped FPs. + assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) + sm[fp2] = copyValuesAndSort(cm21) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) + sm[model.Fingerprint(3)] = copyValuesAndSort(cm22) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) + + // Map cm31, resulting in a mapping straight away. + assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4)) + sm[model.Fingerprint(4)] = copyValuesAndSort(cm31) + + // Map cm32, which is now mapped for two reasons... + assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5)) + sm[model.Fingerprint(5)] = copyValuesAndSort(cm32) + + // Now check ALL the mappings, just to be sure. + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) + assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4)) + assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5)) + + // Remove all the fingerprints from sm, which should change nothing, as + // the existing mappings stay and should be detected. + delete(sm, fp1) + delete(sm, fp2) + delete(sm, fp3) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) + assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) + assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) + assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4)) + assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5)) +} + +// assertFingerprintEqual asserts that two fingerprints are equal. +func assertFingerprintEqual(t *testing.T, gotFP, wantFP model.Fingerprint) { + if gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } +} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 095f5137cbd9..6e34688dce19 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -8,7 +8,10 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" @@ -45,7 +48,7 @@ type stream struct { // Not thread-safe; assume accesses to this are locked by caller. chunks []chunkDesc fp model.Fingerprint - labels []client.LabelAdapter + labels labels.Labels blockSize int tailers map[uint32]*tailer @@ -65,7 +68,7 @@ type entryWithError struct { e error } -func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int) *stream { +func newStream(fp model.Fingerprint, labels labels.Labels, blockSize int) *stream { return &stream{ fp: fp, labels: labels, @@ -126,7 +129,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if len(storedEntries) != 0 { go func() { - stream := logproto.Stream{Labels: client.FromLabelAdaptersToLabels(s.labels).String(), Entries: storedEntries} + stream := logproto.Stream{Labels: s.labels.String(), Entries: storedEntries} closedTailers := []uint32{} @@ -156,7 +159,7 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if lastEntryWithErr.e == chunkenc.ErrOutOfOrder { // return bad http status request response with all failed entries buf := bytes.Buffer{} - streamName := client.FromLabelAdaptersToLabels(s.labels).String() + streamName := s.labels.String() for _, entryWithError := range failedEntriesWithError { _, _ = fmt.Fprintf(&buf, @@ -193,7 +196,7 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, } } - return iter.NewNonOverlappingIterator(iterators, client.FromLabelAdaptersToLabels(s.labels).String()), nil + return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil } func (s *stream) addTailer(t *tailer) { @@ -204,6 +207,6 @@ func (s *stream) addTailer(t *tailer) { } func (s *stream) matchesTailer(t *tailer) bool { - metric := client.FromLabelAdaptersToMetric(s.labels) + metric := util.LabelsToMetric(s.labels) return t.isWatchingLabels(metric) }