Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 557eea9

Browse files
authored
Merge pull request #1471 from grafana/persist_meta_records
Persist meta records to Cassandra index
2 parents 09dc3b5 + 7be06c4 commit 557eea9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1159
-3697
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## breaking changes
44

5+
* as of v0.13.0-75-geaac736a Metrictank requires two new Cassandra tables if the meta tag feature is enabled and the Cassandra index is used. It only creates them automatically if `cassandra-idx-create-keyspace` is set to true.
56
* as of v0.12.0-404-gc7715cb2 we clean up poorly formatted graphite metrics better. To the extent that they have previously worked, queries may need some adjusting
67
#1435
78
* version v0.12.0-96-g998933c3 introduces config options for the cassandra/scylladb index table names.

api/cluster.go

+1-51
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.Index
267267
expressions := make(tagquery.Expressions, len(tags))
268268
builder := strings.Builder{}
269269
for i := range tags {
270-
tags[i].StringIntoBuilder(&builder)
270+
tags[i].StringIntoWriter(&builder)
271271

272272
expressions[i], err = tagquery.ParseExpression(builder.String())
273273
if err != nil {
@@ -606,53 +606,3 @@ func (s *Server) peerQuerySpeculativeChan(ctx context.Context, data cluster.Trac
606606

607607
return resultChan, errorChan
608608
}
609-
610-
func (s *Server) indexMetaTagRecordUpsert(ctx *middleware.Context, req models.IndexMetaTagRecordUpsert) {
611-
if s.MetricIndex == nil {
612-
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordUpsertResult{}))
613-
return
614-
}
615-
616-
record, err := tagquery.ParseMetaTagRecord(req.MetaTags, req.Expressions)
617-
if err != nil {
618-
response.Write(ctx, response.WrapError(err))
619-
return
620-
}
621-
622-
result, created, err := s.MetricIndex.MetaTagRecordUpsert(req.OrgId, record)
623-
if err != nil {
624-
response.Write(ctx, response.WrapError(err))
625-
return
626-
}
627-
628-
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordUpsertResult{
629-
MetaTags: result.MetaTags.Strings(),
630-
Expressions: result.Expressions.Strings(),
631-
Created: created,
632-
}))
633-
}
634-
635-
func (s *Server) indexMetaTagRecordSwap(ctx *middleware.Context, req models.IndexMetaTagRecordSwap) {
636-
if s.MetricIndex == nil {
637-
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordSwapResult{}))
638-
return
639-
}
640-
641-
metaTagRecords := make([]tagquery.MetaTagRecord, len(req.Records))
642-
for i, rawRecord := range req.Records {
643-
var err error
644-
metaTagRecords[i], err = tagquery.ParseMetaTagRecord(rawRecord.MetaTags, rawRecord.Expressions)
645-
if err != nil {
646-
response.Write(ctx, response.WrapError(fmt.Errorf("Error when parsing record %d: %s", i, err)))
647-
return
648-
}
649-
}
650-
651-
added, deleted, err := s.MetricIndex.MetaTagRecordSwap(req.OrgId, metaTagRecords)
652-
if err != nil {
653-
response.Write(ctx, response.WrapError(fmt.Errorf("Error when swapping meta tag records: %s", err)))
654-
return
655-
}
656-
657-
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordSwapResult{Added: added, Deleted: deleted}))
658-
}

api/graphite.go

+20-119
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,7 @@ func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.Gr
12411241
expressions := make(tagquery.Expressions, len(tags))
12421242
builder := strings.Builder{}
12431243
for i := range tags {
1244-
tags[i].StringIntoBuilder(&builder)
1244+
tags[i].StringIntoWriter(&builder)
12451245

12461246
expressions[i], err = tagquery.ParseExpression(builder.String())
12471247
if err != nil {
@@ -1351,146 +1351,47 @@ func (s *Server) getMetaTagRecords(ctx *middleware.Context) {
13511351
}
13521352

13531353
func (s *Server) metaTagRecordUpsert(ctx *middleware.Context, upsertRequest models.MetaTagRecordUpsert) {
1354+
if s.MetricIndex == nil {
1355+
response.Write(ctx, response.WrapError(fmt.Errorf("No metric index present")))
1356+
return
1357+
}
1358+
13541359
record, err := tagquery.ParseMetaTagRecord(upsertRequest.MetaTags, upsertRequest.Expressions)
13551360
if err != nil {
13561361
response.Write(ctx, response.WrapError(err))
13571362
return
13581363
}
13591364

1360-
var localResult tagquery.MetaTagRecord
1361-
var created bool
1362-
if s.MetricIndex != nil {
1363-
var err error
1364-
localResult, created, err = s.MetricIndex.MetaTagRecordUpsert(ctx.OrgId, record)
1365-
if err != nil {
1366-
response.Write(ctx, response.WrapError(err))
1367-
return
1368-
}
1369-
1370-
if !upsertRequest.Propagate {
1371-
response.Write(ctx, response.NewJson(200, models.MetaTagRecordUpsertResult{
1372-
MetaTags: localResult.MetaTags.Strings(),
1373-
Expressions: localResult.Expressions.Strings(),
1374-
Created: created,
1375-
}, ""))
1376-
return
1377-
}
1378-
} else if !upsertRequest.Propagate {
1365+
err = s.MetricIndex.MetaTagRecordUpsert(ctx.OrgId, record)
1366+
if err != nil {
1367+
response.Write(ctx, response.WrapError(err))
13791368
return
13801369
}
13811370

1382-
res := models.MetaTagRecordUpsertResultByNode{
1383-
Local: models.MetaTagRecordUpsertResult{
1384-
MetaTags: localResult.MetaTags.Strings(),
1385-
Expressions: localResult.Expressions.Strings(),
1386-
Created: created,
1387-
},
1388-
}
1389-
1390-
indexUpsertRequest := models.IndexMetaTagRecordUpsert{
1391-
OrgId: ctx.OrgId,
1392-
MetaTags: upsertRequest.MetaTags,
1393-
Expressions: upsertRequest.Expressions,
1394-
}
1395-
1396-
results, errors := s.peerQuery(ctx.Req.Context(), indexUpsertRequest, "metaTagRecordUpsert", "/index/metaTags/upsert")
1397-
1398-
if len(errors) > 0 {
1399-
res.PeerErrors = make(map[string]string, len(errors))
1400-
for peer, err := range errors {
1401-
res.PeerErrors[peer] = err.Error()
1402-
}
1403-
}
1404-
1405-
if len(results) > 0 {
1406-
res.PeerResults = make(map[string]models.MetaTagRecordUpsertResult, len(results))
1407-
for peer, resp := range results {
1408-
peerResp := models.MetaTagRecordUpsertResult{}
1409-
_, err := peerResp.UnmarshalMsg(resp.buf)
1410-
if err != nil {
1411-
res.PeerErrors[peer] = fmt.Sprintf("Error when unmarshaling response: %s", err.Error())
1412-
continue
1413-
}
1414-
res.PeerResults[peer] = peerResp
1415-
}
1416-
}
1417-
1418-
if len(errors) > 0 {
1419-
response.Write(ctx, response.NewJson(500, res, ""))
1420-
} else {
1421-
response.Write(ctx, response.NewJson(200, res, ""))
1422-
}
1371+
response.Write(ctx, response.NewJson(200, struct{ Status string }{Status: "OK"}, ""))
14231372
}
14241373

14251374
func (s *Server) metaTagRecordSwap(ctx *middleware.Context, swapRequest models.MetaTagRecordSwap) {
1375+
if s.MetricIndex == nil {
1376+
response.Write(ctx, response.WrapError(fmt.Errorf("No metric index present")))
1377+
return
1378+
}
1379+
1380+
var err error
14261381
metaTagRecords := make([]tagquery.MetaTagRecord, len(swapRequest.Records))
14271382
for i, rawRecord := range swapRequest.Records {
1428-
var err error
14291383
metaTagRecords[i], err = tagquery.ParseMetaTagRecord(rawRecord.MetaTags, rawRecord.Expressions)
14301384
if err != nil {
14311385
response.Write(ctx, response.WrapError(fmt.Errorf("Error when parsing record %d: %s", i, err)))
14321386
return
14331387
}
14341388
}
14351389

1436-
var added, deleted uint32
1437-
if s.MetricIndex != nil {
1438-
var err error
1439-
added, deleted, err = s.MetricIndex.MetaTagRecordSwap(ctx.OrgId, metaTagRecords)
1440-
if err != nil {
1441-
response.Write(ctx, response.WrapError(err))
1442-
return
1443-
}
1444-
1445-
if !swapRequest.Propagate {
1446-
response.Write(ctx, response.NewJson(200, models.MetaTagRecordSwapResult{
1447-
Added: added,
1448-
Deleted: deleted,
1449-
}, ""))
1450-
return
1451-
}
1452-
} else if !swapRequest.Propagate {
1453-
response.Write(ctx, response.NewJson(200, models.MetaTagRecordSwapResult{}, ""))
1390+
err = s.MetricIndex.MetaTagRecordSwap(ctx.OrgId, metaTagRecords)
1391+
if err != nil {
1392+
response.Write(ctx, response.WrapError(err))
14541393
return
14551394
}
14561395

1457-
res := models.MetaTagRecordSwapResultByNode{
1458-
Local: models.MetaTagRecordSwapResult{
1459-
Added: added,
1460-
Deleted: deleted,
1461-
},
1462-
}
1463-
1464-
indexSwapRequest := models.IndexMetaTagRecordSwap{
1465-
OrgId: ctx.OrgId,
1466-
Records: swapRequest.Records,
1467-
}
1468-
1469-
results, errors := s.peerQuery(ctx.Req.Context(), indexSwapRequest, "metaTagRecordSwap", "/index/metaTags/swap")
1470-
1471-
if len(errors) > 0 {
1472-
res.PeerErrors = make(map[string]string, len(errors))
1473-
for peer, err := range errors {
1474-
res.PeerErrors[peer] = err.Error()
1475-
}
1476-
}
1477-
1478-
if len(results) > 0 {
1479-
res.PeerResults = make(map[string]models.MetaTagRecordSwapResult, len(results))
1480-
for peer, resp := range results {
1481-
peerResp := models.MetaTagRecordSwapResult{}
1482-
_, err := peerResp.UnmarshalMsg(resp.buf)
1483-
if err != nil {
1484-
res.PeerErrors[peer] = fmt.Sprintf("Error when unmarshaling response: %s", err.Error())
1485-
continue
1486-
}
1487-
res.PeerResults[peer] = peerResp
1488-
}
1489-
}
1490-
1491-
if len(errors) > 0 {
1492-
response.Write(ctx, response.NewJson(500, res, ""))
1493-
} else {
1494-
response.Write(ctx, response.NewJson(200, res, ""))
1495-
}
1396+
response.Write(ctx, response.NewJson(200, struct{ Status string }{Status: "OK"}, ""))
14961397
}

api/models/meta_records.go

+1-62
Original file line numberDiff line numberDiff line change
@@ -7,57 +7,23 @@ import (
77
traceLog "github.com/opentracing/opentracing-go/log"
88
)
99

10-
//go:generate msgp
11-
1210
type MetaTagRecordUpsert struct {
1311
MetaTags []string `json:"metaTags"`
1412
Expressions []string `json:"expressions" binding:"Required"`
15-
Propagate bool `json:"propagate"`
1613
}
1714

1815
func (m MetaTagRecordUpsert) Trace(span opentracing.Span) {
1916
span.LogFields(
2017
traceLog.String("metaTags", fmt.Sprintf("%q", m.MetaTags)),
2118
traceLog.String("expressions", fmt.Sprintf("%q", m.Expressions)),
22-
traceLog.Bool("propagate", m.Propagate),
2319
)
2420
}
2521

2622
func (m MetaTagRecordUpsert) TraceDebug(span opentracing.Span) {
2723
}
2824

29-
type MetaTagRecordUpsertResultByNode struct {
30-
Local MetaTagRecordUpsertResult
31-
PeerResults map[string]MetaTagRecordUpsertResult `json:"peerResults"`
32-
PeerErrors map[string]string `json:"peerErrors"`
33-
}
34-
35-
type MetaTagRecordUpsertResult struct {
36-
MetaTags []string `json:"metaTags"`
37-
Expressions []string `json:"expressions"`
38-
Created bool `json:"created"`
39-
}
40-
41-
type IndexMetaTagRecordUpsert struct {
42-
OrgId uint32 `json:"orgId" binding:"Required"`
43-
MetaTags []string `json:"metaTags"`
44-
Expressions []string `json:"expressions" binding:"Required"`
45-
}
46-
47-
func (m IndexMetaTagRecordUpsert) Trace(span opentracing.Span) {
48-
span.SetTag("orgId", m.OrgId)
49-
span.LogFields(
50-
traceLog.String("metaTags", fmt.Sprintf("%q", m.MetaTags)),
51-
traceLog.String("expressions", fmt.Sprintf("%q", m.Expressions)),
52-
)
53-
}
54-
55-
func (m IndexMetaTagRecordUpsert) TraceDebug(span opentracing.Span) {
56-
}
57-
5825
type MetaTagRecordSwap struct {
59-
Records []MetaTagRecord `json:"records"`
60-
Propagate bool `json:"propagate"`
26+
Records []MetaTagRecord `json:"records"`
6127
}
6228

6329
type MetaTagRecord struct {
@@ -68,35 +34,8 @@ type MetaTagRecord struct {
6834
func (m MetaTagRecordSwap) Trace(span opentracing.Span) {
6935
span.LogFields(
7036
traceLog.Uint32("recordCount", uint32(len(m.Records))),
71-
traceLog.Bool("propagate", m.Propagate),
7237
)
7338
}
7439

7540
func (m MetaTagRecordSwap) TraceDebug(span opentracing.Span) {
7641
}
77-
78-
type MetaTagRecordSwapResultByNode struct {
79-
Local MetaTagRecordSwapResult `json:"local"`
80-
PeerResults map[string]MetaTagRecordSwapResult `json:"peerResults"`
81-
PeerErrors map[string]string `json:"peerErrors"`
82-
}
83-
84-
type MetaTagRecordSwapResult struct {
85-
Deleted uint32 `json:"deleted"`
86-
Added uint32 `json:"added"`
87-
}
88-
89-
type IndexMetaTagRecordSwap struct {
90-
OrgId uint32 `json:"orgId" binding:"Required"`
91-
Records []MetaTagRecord `json:"records"`
92-
}
93-
94-
func (m IndexMetaTagRecordSwap) Trace(span opentracing.Span) {
95-
span.SetTag("orgId", m.OrgId)
96-
span.LogFields(
97-
traceLog.Uint32("recordCount", uint32(len(m.Records))),
98-
)
99-
}
100-
101-
func (m IndexMetaTagRecordSwap) TraceDebug(span opentracing.Span) {
102-
}

0 commit comments

Comments
 (0)