8
8
"github.com/gocql/gocql"
9
9
"github.com/grafana/metrictank/expr/tagquery"
10
10
"github.com/grafana/metrictank/idx/memory"
11
+ "github.com/grafana/metrictank/idx/metatags"
11
12
log "github.com/sirupsen/logrus"
12
13
)
13
14
@@ -22,78 +23,15 @@ var (
22
23
Max : time .Second * time .Duration (20 ),
23
24
}
24
25
25
- // this batch id is used if we handle an upsert request for an org that has
26
- // no current batch
27
- defaultBatchId = gocql.UUID {}
28
-
29
- errIdxUpdatesDisabled = fmt .Errorf ("Cassandra index updates are disabled" )
30
- errMetaTagSupportDisabled = fmt .Errorf ("Meta tag support is not enabled" )
26
+ errIdxUpdatesDisabled = fmt .Errorf ("Cassandra index updates are disabled" )
31
27
)
32
28
33
- type metaRecordStatusByOrg struct {
34
- byOrg map [uint32 ]metaRecordStatus
35
- }
36
-
37
- func newMetaRecordStatusByOrg () metaRecordStatusByOrg {
38
- return metaRecordStatusByOrg {byOrg : make (map [uint32 ]metaRecordStatus )}
39
- }
40
-
41
- type metaRecordStatus struct {
42
- batchId gocql.UUID
43
- createdAt uint64
44
- lastUpdate uint64
45
- }
46
-
47
- // update takes the properties describing a batch of meta records and updates its internal status if necessary
48
- // it returns a boolean indicating whether a reload of the meta records is necessary and
49
- // if it is then the second returned value is the batch id that needs to be loaded
50
- func (m * metaRecordStatusByOrg ) update (orgId uint32 , newBatch gocql.UUID , newCreatedAt , newLastUpdate uint64 ) (bool , gocql.UUID ) {
51
- status , ok := m .byOrg [orgId ]
52
- if ! ok {
53
- m .byOrg [orgId ] = metaRecordStatus {
54
- batchId : newBatch ,
55
- createdAt : newCreatedAt ,
56
- lastUpdate : newLastUpdate ,
57
- }
58
- return true , newBatch
59
- }
60
-
61
- // if the current batch has been created at a time before the new batch,
62
- // then we want to make the new batch the current one and load its records
63
- if status .batchId != newBatch && status .createdAt < newCreatedAt {
64
- status .batchId = newBatch
65
- status .createdAt = newCreatedAt
66
- status .lastUpdate = newLastUpdate
67
- m .byOrg [orgId ] = status
68
- return true , status .batchId
69
- }
70
-
71
- // if the current batch is the same as the new batch, but their last update times
72
- // differ, then we want to reload that batch
73
- if status .batchId == newBatch && status .lastUpdate != newLastUpdate {
74
- status .lastUpdate = newLastUpdate
75
- m .byOrg [orgId ] = status
76
- return true , status .batchId
77
- }
78
-
79
- return false , defaultBatchId
80
- }
81
-
82
- func (m * metaRecordStatusByOrg ) getStatus (orgId uint32 ) (gocql.UUID , uint64 , uint64 ) {
83
- status , ok := m .byOrg [orgId ]
84
- if ! ok {
85
- return defaultBatchId , 0 , 0
86
- }
87
-
88
- return status .batchId , status .createdAt , status .lastUpdate
89
- }
90
-
91
29
func (c * CasIdx ) initMetaRecords (session * gocql.Session ) error {
92
30
if ! memory .MetaTagSupport || ! memory .TagSupport {
93
31
return nil
94
32
}
95
33
96
- c .metaRecords = newMetaRecordStatusByOrg ()
34
+ c .metaRecords = metatags . NewMetaRecordStatusByOrg ()
97
35
98
36
err := c .EnsureTableExists (session , c .Config .SchemaFile , "schema_meta_record_table" , c .Config .MetaRecordTable )
99
37
if err != nil {
@@ -125,9 +63,9 @@ func (c *CasIdx) loadMetaRecords() {
125
63
var createdAt , lastUpdate uint64
126
64
toLoad := make (map [uint32 ]gocql.UUID )
127
65
for iter .Scan (& batchId , & orgId , & createdAt , & lastUpdate ) {
128
- load , batchId := c .metaRecords .update (orgId , batchId , createdAt , lastUpdate )
66
+ load , batchId := c .metaRecords .Update (orgId , metatags . UUID ( batchId ) , createdAt , lastUpdate )
129
67
if load {
130
- toLoad [orgId ] = batchId
68
+ toLoad [orgId ] = gocql . UUID ( batchId )
131
69
}
132
70
}
133
71
@@ -195,8 +133,8 @@ func (c *CasIdx) pruneMetaRecords() {
195
133
continue
196
134
}
197
135
198
- currentBatchId , _ , _ := c .metaRecords .getStatus (orgId )
199
- if batchId != currentBatchId {
136
+ currentBatchId , _ , _ := c .metaRecords .GetStatus (orgId )
137
+ if batchId != gocql . UUID ( currentBatchId ) {
200
138
err := c .pruneBatch (orgId , batchId )
201
139
if err != nil {
202
140
log .Errorf ("Error when pruning batch %d/%s: %s" , orgId , batchId .String (), err )
@@ -231,15 +169,15 @@ func (c *CasIdx) pruneBatch(orgId uint32, batchId gocql.UUID) error {
231
169
232
170
func (c * CasIdx ) MetaTagRecordUpsert (orgId uint32 , record tagquery.MetaTagRecord ) error {
233
171
if ! memory .MetaTagSupport || ! memory .TagSupport {
234
- return errMetaTagSupportDisabled
172
+ return metatags . ErrMetaTagSupportDisabled
235
173
}
236
174
if ! c .Config .updateCassIdx {
237
175
return errIdxUpdatesDisabled
238
176
}
239
177
240
178
var err error
241
179
242
- batchId , _ , _ := c .metaRecords .getStatus (orgId )
180
+ batchId , _ , _ := c .metaRecords .GetStatus (orgId )
243
181
244
182
// if a record has no meta tags associated with it, then we delete it
245
183
if len (record .MetaTags ) > 0 {
@@ -262,15 +200,15 @@ func (c *CasIdx) MetaTagRecordUpsert(orgId uint32, record tagquery.MetaTagRecord
262
200
return nil
263
201
}
264
202
265
- func (c * CasIdx ) markMetaRecordBatchUpdated (orgId uint32 , batchId gocql .UUID ) error {
203
+ func (c * CasIdx ) markMetaRecordBatchUpdated (orgId uint32 , batchId metatags .UUID ) error {
266
204
session := c .Session .CurrentSession ()
267
205
now := time .Now ().UnixNano () / 1000000
268
- if batchId == defaultBatchId {
206
+ if batchId == metatags . DefaultBatchId {
269
207
qry := fmt .Sprintf ("INSERT INTO %s (orgid, batchid, createdat, lastupdate) VALUES (?, ?, ?, ?)" , c .Config .MetaRecordBatchTable )
270
208
return session .Query (
271
209
qry ,
272
210
orgId ,
273
- batchId ,
211
+ gocql . UUID ( batchId ) ,
274
212
0 ,
275
213
now ,
276
214
).RetryPolicy (& metaRecordRetryPolicy ).Exec ()
@@ -280,20 +218,20 @@ func (c *CasIdx) markMetaRecordBatchUpdated(orgId uint32, batchId gocql.UUID) er
280
218
return session .Query (
281
219
qry ,
282
220
orgId ,
283
- batchId ,
221
+ gocql . UUID ( batchId ) ,
284
222
now ,
285
223
).RetryPolicy (& metaRecordRetryPolicy ).Exec ()
286
224
}
287
225
288
226
func (c * CasIdx ) MetaTagRecordSwap (orgId uint32 , records []tagquery.MetaTagRecord ) error {
289
227
if ! memory .MetaTagSupport || ! memory .TagSupport {
290
- return errMetaTagSupportDisabled
228
+ return metatags . ErrMetaTagSupportDisabled
291
229
}
292
230
if ! c .Config .updateCassIdx {
293
231
return errIdxUpdatesDisabled
294
232
}
295
233
296
- newBatchId , err := gocql .RandomUUID ()
234
+ newBatchId , err := metatags .RandomUUID ()
297
235
if err != nil {
298
236
return fmt .Errorf ("Failed to generate new batch id" )
299
237
}
@@ -315,7 +253,7 @@ func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecor
315
253
session := c .Session .CurrentSession ()
316
254
err = session .Query (
317
255
qry ,
318
- newBatchId ,
256
+ gocql . UUID ( newBatchId ) ,
319
257
orgId ,
320
258
expressions ,
321
259
metaTags ,
@@ -328,20 +266,20 @@ func (c *CasIdx) MetaTagRecordSwap(orgId uint32, records []tagquery.MetaTagRecor
328
266
return c .createNewBatch (orgId , newBatchId )
329
267
}
330
268
331
- func (c * CasIdx ) createNewBatch (orgId uint32 , batchId gocql .UUID ) error {
269
+ func (c * CasIdx ) createNewBatch (orgId uint32 , batchId metatags .UUID ) error {
332
270
session := c .Session .CurrentSession ()
333
271
now := time .Now ().UnixNano () / 1000000
334
272
qry := fmt .Sprintf ("INSERT INTO %s (orgid, batchid, createdat, lastupdate) VALUES (?, ?, ?, ?)" , c .Config .MetaRecordBatchTable )
335
273
return session .Query (
336
274
qry ,
337
275
orgId ,
338
- batchId ,
276
+ gocql . UUID ( batchId ) ,
339
277
now ,
340
278
now ,
341
279
).RetryPolicy (& metaRecordRetryPolicy ).Exec ()
342
280
}
343
281
344
- func (c * CasIdx ) persistMetaRecord (orgId uint32 , batchId gocql .UUID , record tagquery.MetaTagRecord ) error {
282
+ func (c * CasIdx ) persistMetaRecord (orgId uint32 , batchId metatags .UUID , record tagquery.MetaTagRecord ) error {
345
283
expressions , err := record .Expressions .MarshalJSON ()
346
284
if err != nil {
347
285
return fmt .Errorf ("Failed to marshal expressions: %s" , err )
@@ -355,13 +293,13 @@ func (c *CasIdx) persistMetaRecord(orgId uint32, batchId gocql.UUID, record tagq
355
293
qry := fmt .Sprintf ("INSERT INTO %s (batchid, orgid, expressions, metatags) VALUES (?, ?, ?, ?)" , c .Config .MetaRecordTable )
356
294
return session .Query (
357
295
qry ,
358
- batchId ,
296
+ gocql . UUID ( batchId ) ,
359
297
orgId ,
360
298
expressions ,
361
299
metaTags ).RetryPolicy (& metaRecordRetryPolicy ).Exec ()
362
300
}
363
301
364
- func (c * CasIdx ) deleteMetaRecord (orgId uint32 , batchId gocql .UUID , record tagquery.MetaTagRecord ) error {
302
+ func (c * CasIdx ) deleteMetaRecord (orgId uint32 , batchId metatags .UUID , record tagquery.MetaTagRecord ) error {
365
303
expressions , err := record .Expressions .MarshalJSON ()
366
304
if err != nil {
367
305
return fmt .Errorf ("Failed to marshal record expressions: %s" , err )
@@ -371,7 +309,7 @@ func (c *CasIdx) deleteMetaRecord(orgId uint32, batchId gocql.UUID, record tagqu
371
309
qry := fmt .Sprintf ("DELETE FROM %s WHERE batchid=? AND orgid=? AND expressions=?" , c .Config .MetaRecordTable )
372
310
return session .Query (
373
311
qry ,
374
- batchId ,
312
+ gocql . UUID ( batchId ) ,
375
313
orgId ,
376
314
expressions ,
377
315
).RetryPolicy (& metaRecordRetryPolicy ).Exec ()
0 commit comments