Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 1009fbc

Browse files
committed
move meta records status to pkg idx
this type gets moved into the type idx because we'll also want to use it from the bigtable meta records index.
1 parent d37ddf3 commit 1009fbc

File tree

4 files changed

+164
-93
lines changed

4 files changed

+164
-93
lines changed

idx/cassandra/cassandra.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/grafana/metrictank/cluster"
1515
"github.com/grafana/metrictank/idx"
1616
"github.com/grafana/metrictank/idx/memory"
17+
"github.com/grafana/metrictank/idx/metatags"
1718
"github.com/grafana/metrictank/schema"
1819
"github.com/grafana/metrictank/stats"
1920
"github.com/grafana/metrictank/util"
@@ -62,7 +63,7 @@ type CasIdx struct {
6263
Config *IdxConfig
6364
cluster *gocql.ClusterConfig
6465
Session *cassandra.Session
65-
metaRecords metaRecordStatusByOrg
66+
metaRecords metatags.MetaRecordStatusByOrg
6667
writeQueue chan writeReq
6768
shutdown chan struct{}
6869
wg sync.WaitGroup

idx/cassandra/meta_records.go

+22-84
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/gocql/gocql"
99
"github.com/grafana/metrictank/expr/tagquery"
1010
"github.com/grafana/metrictank/idx/memory"
11+
"github.com/grafana/metrictank/idx/metatags"
1112
log "github.com/sirupsen/logrus"
1213
)
1314

@@ -22,78 +23,15 @@ var (
2223
Max: time.Second * time.Duration(20),
2324
}
2425

25-
// this batch id is used if we handle an upsert request for an org that has
26-
// no current batch
27-
defaultBatchId = gocql.UUID{}
28-
29-
errIdxUpdatesDisabled = fmt.Errorf("Cassandra index updates are disabled")
30-
errMetaTagSupportDisabled = fmt.Errorf("Meta tag support is not enabled")
26+
errIdxUpdatesDisabled = fmt.Errorf("Cassandra index updates are disabled")
3127
)
3228

33-
type metaRecordStatusByOrg struct {
34-
byOrg map[uint32]metaRecordStatus
35-
}
36-
37-
func newMetaRecordStatusByOrg() metaRecordStatusByOrg {
38-
return metaRecordStatusByOrg{byOrg: make(map[uint32]metaRecordStatus)}
39-
}
40-
41-
type metaRecordStatus struct {
42-
batchId gocql.UUID
43-
createdAt uint64
44-
lastUpdate uint64
45-
}
46-
47-
// update takes the properties describing a batch of meta records and updates its internal status if necessary
48-
// it returns a boolean indicating whether a reload of the meta records is necessary and
49-
// if it is then the second returned value is the batch id that needs to be loaded
50-
func (m *metaRecordStatusByOrg) update(orgId uint32, newBatch gocql.UUID, newCreatedAt, newLastUpdate uint64) (bool, gocql.UUID) {
51-
status, ok := m.byOrg[orgId]
52-
if !ok {
53-
m.byOrg[orgId] = metaRecordStatus{
54-
batchId: newBatch,
55-
createdAt: newCreatedAt,
56-
lastUpdate: newLastUpdate,
57-
}
58-
return true, newBatch
59-
}
60-
61-
// if the current batch has been created at a time before the new batch,
62-
// then we want to make the new batch the current one and load its records
63-
if status.batchId != newBatch && status.createdAt < newCreatedAt {
64-
status.batchId = newBatch
65-
status.createdAt = newCreatedAt
66-
status.lastUpdate = newLastUpdate
67-
m.byOrg[orgId] = status
68-
return true, status.batchId
69-
}
70-
71-
// if the current batch is the same as the new batch, but their last update times
72-
// differ, then we want to reload that batch
73-
if status.batchId == newBatch && status.lastUpdate != newLastUpdate {
74-
status.lastUpdate = newLastUpdate
75-
m.byOrg[orgId] = status
76-
return true, status.batchId
77-
}
78-
79-
return false, defaultBatchId
80-
}
81-
82-
func (m *metaRecordStatusByOrg) getStatus(orgId uint32) (gocql.UUID, uint64, uint64) {
83-
status, ok := m.byOrg[orgId]
84-
if !ok {
85-
return defaultBatchId, 0, 0
86-
}
87-
88-
return status.batchId, status.createdAt, status.lastUpdate
89-
}
90-
9129
func (c *CasIdx) initMetaRecords(session *gocql.Session) error {
9230
if !memory.MetaTagSupport || !memory.TagSupport {
9331
return nil
9432
}
9533

96-
c.metaRecords = newMetaRecordStatusByOrg()
34+
c.metaRecords = metatags.NewMetaRecordStatusByOrg()
9735

9836
err := c.EnsureTableExists(session, c.Config.SchemaFile, "schema_meta_record_table", c.Config.MetaRecordTable)
9937
if err != nil {
@@ -125,9 +63,9 @@ func (c *CasIdx) loadMetaRecords() {
12563
var createdAt, lastUpdate uint64
12664
toLoad := make(map[uint32]gocql.UUID)
12765
for iter.Scan(&batchId, &orgId, &createdAt, &lastUpdate) {
128-
load, batchId := c.metaRecords.update(orgId, batchId, createdAt, lastUpdate)
66+
load, batchId := c.metaRecords.Update(orgId, metatags.UUID(batchId), createdAt, lastUpdate)
12967
if load {
130-
toLoad[orgId] = batchId
68+
toLoad[orgId] = gocql.UUID(batchId)
13169
}
13270
}
13371

@@ -195,8 +133,8 @@ func (c *CasIdx) pruneMetaRecords() {
195133
continue
196134
}
197135

198-
currentBatchId, _, _ := c.metaRecords.getStatus(orgId)
199-
if batchId != currentBatchId {
136+
currentBatchId, _, _ := c.metaRecords.GetStatus(orgId)
137+
if batchId != gocql.UUID(currentBatchId) {
200138
err := c.pruneBatch(orgId, batchId)
201139
if err != nil {
202140
log.Errorf("Error when pruning batch %d/%s: %s", orgId, batchId.String(), err)
@@ -231,15 +169,15 @@ func (c *CasIdx) pruneBatch(orgId uint32, batchId gocql.UUID) error {
231169

232170
func (c *CasIdx) MetaTagRecordUpsert(orgId uint32, record tagquery.MetaTagRecord) error {
233171
if !memory.MetaTagSupport || !memory.TagSupport {
234-
return errMetaTagSupportDisabled
172+
return metatags.ErrMetaTagSupportDisabled
235173
}
236174
if !c.Config.updateCassIdx {
237175
return errIdxUpdatesDisabled
238176
}
239177

240178
var err error
241179

242-
batchId, _, _ := c.metaRecords.getStatus(orgId)
180+
batchId, _, _ := c.metaRecords.GetStatus(orgId)
243181

244182
// if a record has no meta tags associated with it, then we delete it
245183
if len(record.MetaTags) > 0 {
@@ -262,15 +200,15 @@ func (c *CasIdx) MetaTagRecordUpsert(orgId uint32, record tagquery.MetaTagRecord
262200
return nil
263201
}
264202

265-
func (c *CasIdx) markMetaRecordBatchUpdated(orgId uint32, batchId gocql.UUID) error {
203+
func (c *CasIdx) markMetaRecordBatchUpdated(orgId uint32, batchId metatags.UUID) error {
266204
session := c.Session.CurrentSession()
267205
now := time.Now().UnixNano() / 1000000
268-
if batchId == defaultBatchId {
206+
if batchId == metatags.DefaultBatchId {
269207
qry := fmt.Sprintf("INSERT INTO %s (orgid, batchid, createdat, lastupdate) VALUES (?, ?, ?, ?)", c.Config.MetaRecordBatchTable)
270208
return session.Query(
271209
qry,
272210
orgId,
273-
batchId,
211+
gocql.UUID(batchId),
274212
0,
275213
now,
276214
).RetryPolicy(&metaRecordRetryPolicy).Exec()
@@ -280,20 +218,20 @@ func (c *CasIdx) markMetaRecordBatchUpdated(orgId uint32, batchId gocql.UUID) er
280218
return session.Query(
281219
qry,
282220
orgId,
283-
batchId,
221+
gocql.UUID(batchId),
284222
now,
285223
).RetryPolicy(&metaRecordRetryPolicy).Exec()
286224
}
287225

288226
func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecord) error {
289227
if !memory.MetaTagSupport || !memory.TagSupport {
290-
return errMetaTagSupportDisabled
228+
return metatags.ErrMetaTagSupportDisabled
291229
}
292230
if !c.Config.updateCassIdx {
293231
return errIdxUpdatesDisabled
294232
}
295233

296-
newBatchId, err := gocql.RandomUUID()
234+
newBatchId, err := metatags.RandomUUID()
297235
if err != nil {
298236
return fmt.Errorf("Failed to generate new batch id")
299237
}
@@ -315,7 +253,7 @@ func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecor
315253
session := c.Session.CurrentSession()
316254
err = session.Query(
317255
qry,
318-
newBatchId,
256+
gocql.UUID(newBatchId),
319257
orgId,
320258
expressions,
321259
metaTags,
@@ -328,20 +266,20 @@ func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecor
328266
return c.createNewBatch(orgId, newBatchId)
329267
}
330268

331-
func (c *CasIdx) createNewBatch(orgId uint32, batchId gocql.UUID) error {
269+
func (c *CasIdx) createNewBatch(orgId uint32, batchId metatags.UUID) error {
332270
session := c.Session.CurrentSession()
333271
now := time.Now().UnixNano() / 1000000
334272
qry := fmt.Sprintf("INSERT INTO %s (orgid, batchid, createdat, lastupdate) VALUES (?, ?, ?, ?)", c.Config.MetaRecordBatchTable)
335273
return session.Query(
336274
qry,
337275
orgId,
338-
batchId,
276+
gocql.UUID(batchId),
339277
now,
340278
now,
341279
).RetryPolicy(&metaRecordRetryPolicy).Exec()
342280
}
343281

344-
func (c *CasIdx) persistMetaRecord(orgId uint32, batchId gocql.UUID, record tagquery.MetaTagRecord) error {
282+
func (c *CasIdx) persistMetaRecord(orgId uint32, batchId metatags.UUID, record tagquery.MetaTagRecord) error {
345283
expressions, err := record.Expressions.MarshalJSON()
346284
if err != nil {
347285
return fmt.Errorf("Failed to marshal expressions: %s", err)
@@ -355,13 +293,13 @@ func (c *CasIdx) persistMetaRecord(orgId uint32, batchId gocql.UUID, record tagq
355293
qry := fmt.Sprintf("INSERT INTO %s (batchid, orgid, expressions, metatags) VALUES (?, ?, ?, ?)", c.Config.MetaRecordTable)
356294
return session.Query(
357295
qry,
358-
batchId,
296+
gocql.UUID(batchId),
359297
orgId,
360298
expressions,
361299
metaTags).RetryPolicy(&metaRecordRetryPolicy).Exec()
362300
}
363301

364-
func (c *CasIdx) deleteMetaRecord(orgId uint32, batchId gocql.UUID, record tagquery.MetaTagRecord) error {
302+
func (c *CasIdx) deleteMetaRecord(orgId uint32, batchId metatags.UUID, record tagquery.MetaTagRecord) error {
365303
expressions, err := record.Expressions.MarshalJSON()
366304
if err != nil {
367305
return fmt.Errorf("Failed to marshal record expressions: %s", err)
@@ -371,7 +309,7 @@ func (c *CasIdx) deleteMetaRecord(orgId uint32, batchId gocql.UUID, record tagqu
371309
qry := fmt.Sprintf("DELETE FROM %s WHERE batchid=? AND orgid=? AND expressions=?", c.Config.MetaRecordTable)
372310
return session.Query(
373311
qry,
374-
batchId,
312+
gocql.UUID(batchId),
375313
orgId,
376314
expressions,
377315
).RetryPolicy(&metaRecordRetryPolicy).Exec()

idx/metatags/meta_record_status.go

+134
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package metatags
2+
3+
import (
4+
"crypto/rand"
5+
"fmt"
6+
"io"
7+
)
8+
9+
var (
10+
// this batch id is used if we handle an upsert request for an org that has
11+
// no current batch
12+
DefaultBatchId UUID
13+
14+
ErrMetaTagSupportDisabled = fmt.Errorf("Meta tag support is not enabled")
15+
)
16+
17+
type MetaRecordStatusByOrg struct {
18+
byOrg map[uint32]metaRecordStatus
19+
}
20+
21+
func NewMetaRecordStatusByOrg() MetaRecordStatusByOrg {
22+
return MetaRecordStatusByOrg{byOrg: make(map[uint32]metaRecordStatus)}
23+
}
24+
25+
type metaRecordStatus struct {
26+
batchId UUID
27+
createdAt uint64
28+
lastUpdate uint64
29+
}
30+
31+
// UUIDs are used as meta record batch IDs
32+
type UUID [16]byte
33+
34+
// RandomUUID generates a randomized UUID,
35+
// the code is copied from gocql.RandomUUID()
36+
func RandomUUID() (UUID, error) {
37+
var u UUID
38+
_, err := io.ReadFull(rand.Reader, u[:])
39+
if err != nil {
40+
return u, err
41+
}
42+
u[6] &= 0x0F // clear version
43+
u[6] |= 0x40 // set version to 4 (random uuid)
44+
u[8] &= 0x3F // clear variant
45+
u[8] |= 0x80 // set to IETF variant
46+
return u, nil
47+
}
48+
49+
// ParseUUID parses a 32 digit hexadecimal number (that might contain hypens)
50+
// representing an UUID.
51+
func ParseUUID(input string) (UUID, error) {
52+
var u UUID
53+
j := 0
54+
for _, r := range input {
55+
switch {
56+
case r == '-' && j&1 == 0:
57+
continue
58+
case r >= '0' && r <= '9' && j < 32:
59+
u[j/2] |= byte(r-'0') << uint(4-j&1*4)
60+
case r >= 'a' && r <= 'f' && j < 32:
61+
u[j/2] |= byte(r-'a'+10) << uint(4-j&1*4)
62+
case r >= 'A' && r <= 'F' && j < 32:
63+
u[j/2] |= byte(r-'A'+10) << uint(4-j&1*4)
64+
default:
65+
return UUID{}, fmt.Errorf("invalid UUID %q", input)
66+
}
67+
j += 1
68+
}
69+
if j != 32 {
70+
return UUID{}, fmt.Errorf("invalid UUID %q", input)
71+
}
72+
return u, nil
73+
}
74+
75+
// String returns the UUID in it's canonical form, a 32 digit hexadecimal
76+
// number in the form of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.
77+
func (u UUID) String() string {
78+
var offsets = [...]int{0, 2, 4, 6, 9, 11, 14, 16, 19, 21, 24, 26, 28, 30, 32, 34}
79+
const hexString = "0123456789abcdef"
80+
r := make([]byte, 36)
81+
for i, b := range u {
82+
r[offsets[i]] = hexString[b>>4]
83+
r[offsets[i]+1] = hexString[b&0xF]
84+
}
85+
r[8] = '-'
86+
r[13] = '-'
87+
r[18] = '-'
88+
r[23] = '-'
89+
return string(r)
90+
}
91+
92+
// update takes the properties describing a batch of meta records and updates its internal status if necessary
93+
// it returns a boolean indicating whether a reload of the meta records is necessary and
94+
// if it is then the second returned value is the batch id that needs to be loaded
95+
func (m *MetaRecordStatusByOrg) Update(orgId uint32, newBatch UUID, newCreatedAt, newLastUpdate uint64) (bool, UUID) {
96+
status, ok := m.byOrg[orgId]
97+
if !ok {
98+
m.byOrg[orgId] = metaRecordStatus{
99+
batchId: newBatch,
100+
createdAt: newCreatedAt,
101+
lastUpdate: newLastUpdate,
102+
}
103+
return true, newBatch
104+
}
105+
106+
// if the current batch has been created at a time before the new batch,
107+
// then we want to make the new batch the current one and load its records
108+
if status.batchId != newBatch && status.createdAt < newCreatedAt {
109+
status.batchId = newBatch
110+
status.createdAt = newCreatedAt
111+
status.lastUpdate = newLastUpdate
112+
m.byOrg[orgId] = status
113+
return true, status.batchId
114+
}
115+
116+
// if the current batch is the same as the new batch, but their last update times
117+
// differ, then we want to reload that batch
118+
if status.batchId == newBatch && status.lastUpdate != newLastUpdate {
119+
status.lastUpdate = newLastUpdate
120+
m.byOrg[orgId] = status
121+
return true, status.batchId
122+
}
123+
124+
return false, DefaultBatchId
125+
}
126+
127+
func (m *MetaRecordStatusByOrg) GetStatus(orgId uint32) (UUID, uint64, uint64) {
128+
status, ok := m.byOrg[orgId]
129+
if !ok {
130+
return DefaultBatchId, 0, 0
131+
}
132+
133+
return status.batchId, status.createdAt, status.lastUpdate
134+
}

0 commit comments

Comments
 (0)