Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 3a92e54

Browse files
authored
Merge pull request #21 from bloomberg/multiDef
Fix regressing timestamps, missing defs
2 parents 5a8688e + 09074f1 commit 3a92e54

File tree

4 files changed

+56
-22
lines changed

4 files changed

+56
-22
lines changed

api/dataprocessor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
170170
if len(remoteReqs) > 0 {
171171
wg.Add(1)
172172
go func() {
173-
// all errors returned returned are *response.Error.
173+
// all errors returned are *response.Error.
174174
series, err := s.getTargetsRemote(getCtx, remoteReqs)
175175
if err != nil {
176176
cancel()

api/graphite.go

+7-12
Original file line numberDiff line numberDiff line change
@@ -924,8 +924,6 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
924924
}
925925

926926
func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
927-
seriesSet := make(map[string]Series)
928-
929927
result, err := s.MetricIndex.FindByTag(orgId, expressions, from)
930928
if err != nil {
931929
return nil, err
@@ -938,12 +936,14 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
938936
default:
939937
}
940938

939+
var allSeries []Series
940+
941941
for _, series := range result {
942-
seriesSet[series.Path] = Series{
942+
allSeries = append(allSeries, Series{
943943
Pattern: series.Path,
944944
Node: cluster.Manager.ThisNode(),
945945
Series: []idx.Node{series},
946-
}
946+
})
947947
}
948948

949949
data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
@@ -966,20 +966,15 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
966966
return nil, err
967967
}
968968
for _, series := range resp.Metrics {
969-
seriesSet[series.Path] = Series{
969+
allSeries = append(allSeries, Series{
970970
Pattern: series.Path,
971971
Node: r.peer,
972972
Series: []idx.Node{series},
973-
}
973+
})
974974
}
975975
}
976976

977-
series := make([]Series, 0, len(seriesSet))
978-
for _, s := range seriesSet {
979-
series = append(series, s)
980-
}
981-
982-
return series, nil
977+
return allSeries, nil
983978
}
984979

985980
func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) {

idx/memory/memory.go

+29-8
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,10 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
222222
if LogLevel < 2 {
223223
log.Debug("metricDef with id %v already in index", point.MKey)
224224
}
225-
existing.LastUpdate = int64(point.Time)
225+
226+
if existing.LastUpdate < int64(point.Time) {
227+
existing.LastUpdate = int64(point.Time)
228+
}
226229
existing.Partition = partition
227230
statUpdate.Inc()
228231
statUpdateDuration.Value(time.Since(pre))
@@ -244,7 +247,9 @@ func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, parti
244247
if ok {
245248
oldPart := existing.Partition
246249
log.Debug("metricDef with id %s already in index.", mkey)
247-
existing.LastUpdate = data.Time
250+
if existing.LastUpdate < int64(data.Time) {
251+
existing.LastUpdate = int64(data.Time)
252+
}
248253
existing.Partition = partition
249254
statUpdate.Inc()
250255
statUpdateDuration.Value(time.Since(pre))
@@ -833,6 +838,7 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
833838

834839
ids := m.idsByTagQuery(orgId, query)
835840
res := make([]idx.Node, 0, len(ids))
841+
seen := make(map[string]struct{})
836842
for id := range ids {
837843
def, ok := m.defById[id]
838844
if !ok {
@@ -841,12 +847,27 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
841847
continue
842848
}
843849

844-
res = append(res, idx.Node{
845-
Path: def.NameWithTags(),
846-
Leaf: true,
847-
HasChildren: false,
848-
Defs: []idx.Archive{*def},
849-
})
850+
if _, ok := seen[def.NameWithTags()]; !ok {
851+
852+
defMap := m.defByTagSet.defs(orgId, def.NameWithTags())
853+
854+
defs := make([]idx.Archive, 0, len(defMap))
855+
856+
for d := range defMap {
857+
if from != 0 && def.LastUpdate < from {
858+
continue
859+
}
860+
defs = append(defs, *m.defById[d.Id])
861+
}
862+
863+
res = append(res, idx.Node{
864+
Path: def.NameWithTags(),
865+
Leaf: true,
866+
HasChildren: false,
867+
Defs: defs,
868+
})
869+
seen[def.NameWithTags()] = struct{}{}
870+
}
850871
}
851872
return res, nil
852873
}

idx/memory/memory_test.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,19 @@ func testGetAddKey(t *testing.T) {
126126
So(defs, ShouldHaveLength, 15)
127127
})
128128
})
129+
130+
if TagSupport {
131+
Convey("When adding metricDefs with the same series name as existing metricDefs (tagged)", t, func() {
132+
Convey("then findByTag", func() {
133+
nodes, _ := ix.FindByTag(1, []string{"name!="}, 0)
134+
defs := make([]idx.Archive, 0, len(nodes))
135+
for i := range nodes {
136+
defs = append(defs, nodes[i].Defs...)
137+
}
138+
So(defs, ShouldHaveLength, 10)
139+
})
140+
})
141+
}
129142
}
130143

131144
func TestFind(t *testing.T) {
@@ -673,7 +686,12 @@ func TestPruneTaggedSeriesWithCollidingTagSets(t *testing.T) {
673686
Convey("After purge", t, func() {
674687
nodes, err := ix.FindByTag(1, findExpressions, 0)
675688
So(err, ShouldBeNil)
676-
So(nodes, ShouldHaveLength, 2)
689+
So(nodes, ShouldHaveLength, 1)
690+
defs := make([]idx.Archive, 0, len(nodes))
691+
for i := range nodes {
692+
defs = append(defs, nodes[i].Defs...)
693+
}
694+
So(defs, ShouldHaveLength, 2)
677695
})
678696

679697
Convey("When purging newer series", t, func() {

0 commit comments

Comments
 (0)