Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 54b0708

Browse files
committed
fix meta record up for merge
1 parent 2e82c20 commit 54b0708

14 files changed

+1789
-515
lines changed

api/cluster.go

+32-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/grafana/metrictank/api/models"
1414
"github.com/grafana/metrictank/api/response"
1515
"github.com/grafana/metrictank/cluster"
16+
"github.com/grafana/metrictank/idx"
1617
"github.com/grafana/metrictank/stats"
1718
log "github.com/sirupsen/logrus"
1819
"github.com/tinylib/msgp/msgp"
@@ -314,7 +315,7 @@ type PeerResponse struct {
314315
// data: request to be submitted
315316
// name: name to be used in logging & tracing
316317
// path: path to request on
317-
func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
318+
func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, map[string]error) {
318319

319320
peers := cluster.Manager.MemberList(false, true)
320321
log.Debugf("HTTP %s across %d instances", name, len(peers)-1)
@@ -353,14 +354,16 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
353354
}()
354355

355356
result := make(map[string]PeerResponse)
357+
errors := make(map[string]error)
356358
for resp := range responses {
357359
if resp.err != nil {
358-
return nil, resp.err
360+
errors[resp.data.peer.GetName()] = resp.err
361+
} else {
362+
result[resp.data.peer.GetName()] = resp.data
359363
}
360-
result[resp.data.peer.GetName()] = resp.data
361364
}
362365

363-
return result, nil
366+
return result, errors
364367
}
365368

366369
// peerQuerySpeculative takes a request and the path to request it on, then fans it out
@@ -505,3 +508,28 @@ func (s *Server) peerQuerySpeculativeChan(ctx context.Context, data cluster.Trac
505508

506509
return resultChan, errorChan
507510
}
511+
512+
func (s *Server) indexMetaTagRecordUpsert(ctx *middleware.Context, req models.IndexMetaTagRecordUpsert) {
513+
if s.MetricIndex == nil {
514+
response.Write(ctx, response.NewMsgp(200, &models.IndexFindByTagResp{}))
515+
return
516+
}
517+
518+
record := idx.MetaTagRecord{
519+
MetaTags: req.MetaTags,
520+
Queries: req.Queries,
521+
}
522+
523+
result, created, err := s.MetricIndex.MetaTagRecordUpsert(req.OrgId, record)
524+
if err != nil {
525+
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
526+
return
527+
}
528+
529+
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordUpsertResult{
530+
MetaTags: result.MetaTags,
531+
Queries: result.Queries,
532+
ID: result.ID,
533+
Created: created,
534+
}))
535+
}

api/graphite.go

+67-10
Original file line numberDiff line numberDiff line change
@@ -1115,16 +1115,18 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr
11151115
}
11161116

11171117
data := models.IndexTagDelSeries{OrgId: ctx.OrgId, Paths: request.Paths}
1118-
responses, err := s.peerQuery(ctx.Req.Context(), data, "clusterTagDelSeries,", "/index/tags/delSeries")
1119-
if err != nil {
1118+
responses, errors := s.peerQuery(ctx.Req.Context(), data, "clusterTagDelSeries,", "/index/tags/delSeries")
1119+
1120+
// if there are any errors, write one of them and return
1121+
for _, err := range errors {
11201122
response.Write(ctx, response.WrapErrorForTagDB(err))
11211123
return
11221124
}
11231125

11241126
res.Peers = make(map[string]int, len(responses))
11251127
peerResp := models.IndexTagDelSeriesResp{}
11261128
for peer, resp := range responses {
1127-
_, err = peerResp.UnmarshalMsg(resp.buf)
1129+
_, err := peerResp.UnmarshalMsg(resp.buf)
11281130
if err != nil {
11291131
response.Write(ctx, response.WrapErrorForTagDB(err))
11301132
return
@@ -1191,16 +1193,71 @@ func (s *Server) getMetaTagRecord(ctx *middleware.Context) {
11911193
response.Write(ctx, response.NewJson(200, metaTagRecords, ""))
11921194
}
11931195

1194-
func (s *Server) metaTagRecordUpsert(ctx *middleware.Context, metaTagRecord models.MetaTagRecord) {
1196+
func (s *Server) metaTagRecordUpsert(ctx *middleware.Context, upsertRequest models.MetaTagRecordUpsert) {
11951197
record := idx.MetaTagRecord{
1196-
MetaTags: metaTagRecord.MetaTags,
1197-
Queries: metaTagRecord.TagQueries,
1198+
MetaTags: upsertRequest.MetaTags,
1199+
Queries: upsertRequest.Queries,
11981200
}
11991201

1200-
result, err := s.MetricIndex.MetaTagRecordUpsert(ctx.OrgId, record)
1201-
if err != nil {
1202-
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
1202+
var localResult idx.MetaTagRecord
1203+
var created bool
1204+
if s.MetricIndex != nil {
1205+
var err error
1206+
localResult, created, err = s.MetricIndex.MetaTagRecordUpsert(ctx.OrgId, record)
1207+
if err != nil {
1208+
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
1209+
return
1210+
}
1211+
1212+
if !upsertRequest.Propagate {
1213+
response.Write(ctx, response.NewJson(200, models.MetaTagRecordUpsertResult{
1214+
MetaTags: localResult.MetaTags,
1215+
Queries: localResult.Queries,
1216+
ID: localResult.ID,
1217+
Created: created,
1218+
}, ""))
1219+
return
1220+
}
1221+
} else if !upsertRequest.Propagate {
1222+
return
12031223
}
12041224

1205-
response.Write(ctx, response.NewJson(200, result, ""))
1225+
indexUpsertRequest := models.IndexMetaTagRecordUpsert{
1226+
OrgId: ctx.OrgId,
1227+
MetaTags: upsertRequest.MetaTags,
1228+
Queries: upsertRequest.Queries,
1229+
}
1230+
1231+
results, errors := s.peerQuery(ctx.Req.Context(), indexUpsertRequest, "metaTagRecordUpsert", "/index/metaTags/upsert")
1232+
1233+
res := models.MetaTagRecordUpsertResultByNode{
1234+
Local: models.MetaTagRecordUpsertResult{
1235+
MetaTags: localResult.MetaTags,
1236+
Queries: localResult.Queries,
1237+
ID: localResult.ID,
1238+
Created: created,
1239+
},
1240+
}
1241+
1242+
if len(errors) > 0 {
1243+
res.PeerErrors = make(map[string]string, len(errors))
1244+
for peer, err := range errors {
1245+
res.PeerErrors[peer] = err.Error()
1246+
}
1247+
}
1248+
1249+
if len(results) > 0 {
1250+
res.PeerResults = make(map[string]models.MetaTagRecordUpsertResult, len(results))
1251+
for peer, resp := range results {
1252+
peerResp := models.MetaTagRecordUpsertResult{}
1253+
_, err := peerResp.UnmarshalMsg(resp.buf)
1254+
if err != nil {
1255+
res.PeerErrors[peer] = fmt.Sprintf("Error when unmarshaling response: %s", err.Error())
1256+
continue
1257+
}
1258+
res.PeerResults[peer] = peerResp
1259+
}
1260+
}
1261+
1262+
response.Write(ctx, response.NewJson(200, res, ""))
12061263
}

api/models/graphite.go

-5
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,6 @@ type GraphiteFind struct {
151151
Jsonp string `json:"jsonp" form:"jsonp"`
152152
}
153153

154-
type MetaTagRecord struct {
155-
MetaTags []string
156-
TagQueries []string
157-
}
158-
159154
type MetricsDelete struct {
160155
Query string `json:"query" form:"query" binding:"Required"`
161156
}

api/models/graphite_gen.go

-207
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)