Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Fix multi-interval series issue #897

Merged
merged 4 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
if len(remoteReqs) > 0 {
wg.Add(1)
go func() {
// all errors returned returned are *response.Error.
// all errors returned are *response.Error.
series, err := s.getTargetsRemote(getCtx, remoteReqs)
if err != nil {
cancel()
Expand Down
19 changes: 7 additions & 12 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,6 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
}

func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
seriesSet := make(map[string]Series)

result, err := s.MetricIndex.FindByTag(orgId, expressions, from)
if err != nil {
return nil, err
Expand All @@ -938,12 +936,14 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
default:
}

var allSeries []Series

for _, series := range result {
seriesSet[series.Path] = Series{
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: cluster.Manager.ThisNode(),
Series: []idx.Node{series},
}
})
}

data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
Expand All @@ -966,20 +966,15 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
return nil, err
}
for _, series := range resp.Metrics {
seriesSet[series.Path] = Series{
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: r.peer,
Series: []idx.Node{series},
}
})
}
}

series := make([]Series, 0, len(seriesSet))
for _, s := range seriesSet {
series = append(series, s)
}

return series, nil
return allSeries, nil
}

func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) {
Expand Down
46 changes: 33 additions & 13 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
if LogLevel < 2 {
log.Debug("metricDef with id %v already in index", point.MKey)
}
existing.LastUpdate = int64(point.Time)

if existing.LastUpdate < int64(point.Time) {
existing.LastUpdate = int64(point.Time)
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change (and the similar one below) are somewhat orthogonal, but as part of the interval change a rogue datapoint flying in "reset" the LastUpdate time to an older time and made the data unfindable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very interesting. note that the tank (AggMetric) wouldn't add this point (unless it is accepted by the reorder buffer), which makes this case even more interesting.
Also this only seems a problem if this "buggy, old" point is not followed by a correct, recent point.
you're the first one I know with a stream this broken, but sure, we may as well protect against it :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This didn't actually happen to us, but it was my leading guess of what happened. Turns out I was wrong. The issue we had was that during a backfill, the LastUpdate in cassandra doesn't get set correctly (because the data stopped and didn't cause a flush). It's a mega corner case though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, comparing Find with FindByTag and checking for how they differ is an interesting exercise. I found another bug which we can fix later: #899
probably no one has noticed cause no one uses the feature (except worldping which is currently untagged)

existing.Partition = partition
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
Expand All @@ -244,7 +247,9 @@ func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, parti
if ok {
oldPart := existing.Partition
log.Debug("metricDef with id %s already in index.", mkey)
existing.LastUpdate = data.Time
if existing.LastUpdate < int64(data.Time) {
existing.LastUpdate = int64(data.Time)
}
existing.Partition = partition
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -831,8 +836,9 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
m.RLock()
defer m.RUnlock()

// construct the output slice of idx.Node's such that there is only 1 idx.Node for each path
ids := m.idsByTagQuery(orgId, query)
res := make([]idx.Node, 0, len(ids))
byPath := make(map[string]*idx.Node)
for id := range ids {
def, ok := m.defById[id]
if !ok {
Expand All @@ -841,14 +847,25 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
continue
}

res = append(res, idx.Node{
Path: def.NameWithTags(),
Leaf: true,
HasChildren: false,
Defs: []idx.Archive{*def},
})
if existing, ok := byPath[def.NameWithTags()]; !ok {
byPath[def.NameWithTags()] = &idx.Node{
Path: def.NameWithTags(),
Leaf: true,
HasChildren: false,
Defs: []idx.Archive{*def},
}
} else {
existing.Defs = append(existing.Defs, *def)
}
}

results := make([]idx.Node, 0, len(byPath))

for _, v := range byPath {
results = append(results, *v)
}
return res, nil

return results, nil
}

func (m *MemoryIdx) idsByTagQuery(orgId uint32, query TagQuery) IdSet {
Expand Down Expand Up @@ -877,11 +894,13 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
}
log.Debug("memory-idx: %d nodes matching pattern %s found", len(matchedNodes), pattern)
results := make([]idx.Node, 0)
seen := make(map[string]struct{})
byPath := make(map[string]struct{})
// construct the output slice of idx.Node's such that there is only 1 idx.Node
// for each path, and it holds all defs that the Node refers too.
// if there are public (orgId OrgIdPublic) and private leaf nodes with the same series
// path, then the public metricDefs will be excluded.
for _, n := range matchedNodes {
if _, ok := seen[n.Path]; !ok {
if _, ok := byPath[n.Path]; !ok {
idxNode := idx.Node{
Path: n.Path,
Leaf: n.Leaf(),
Expand All @@ -904,7 +923,7 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
}
}
results = append(results, idxNode)
seen[n.Path] = struct{}{}
byPath[n.Path] = struct{}{}
} else {
log.Debug("memory-idx: path %s already seen", n.Path)
}
Expand All @@ -914,6 +933,7 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
return results, nil
}

// find returns all Nodes matching the pattern for the given orgId
func (m *MemoryIdx) find(orgId uint32, pattern string) ([]*Node, error) {
tree, ok := m.tree[orgId]
if !ok {
Expand Down
21 changes: 20 additions & 1 deletion idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ func testGetAddKey(t *testing.T) {
So(defs, ShouldHaveLength, 15)
})
})

if TagSupport {
Convey("When adding metricDefs with the same series name as existing metricDefs (tagged)", t, func() {
Convey("then findByTag", func() {
nodes, err := ix.FindByTag(1, []string{"name!="}, 0)
So(err, ShouldBeNil)
defs := make([]idx.Archive, 0, len(nodes))
for i := range nodes {
defs = append(defs, nodes[i].Defs...)
}
So(defs, ShouldHaveLength, 2*len(org1Series))
})
})
}
}

func TestFind(t *testing.T) {
Expand Down Expand Up @@ -673,7 +687,12 @@ func TestPruneTaggedSeriesWithCollidingTagSets(t *testing.T) {
Convey("After purge", t, func() {
nodes, err := ix.FindByTag(1, findExpressions, 0)
So(err, ShouldBeNil)
So(nodes, ShouldHaveLength, 2)
So(nodes, ShouldHaveLength, 1)
defs := make([]idx.Archive, 0, len(nodes))
for i := range nodes {
defs = append(defs, nodes[i].Defs...)
}
So(defs, ShouldHaveLength, 2)
})

Convey("When purging newer series", t, func() {
Expand Down