Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit b853661

Browse files
author
woodsaj
committed
update idx handling
- support LastSave property, so we only periodically save to cassandra. - make adding to the writeQueue a non-blocking operation, unless the def has not been updated for 1.5x the updateInterval.
1 parent 02f4007 commit b853661

File tree

4 files changed

+223
-46
lines changed

4 files changed

+223
-46
lines changed

idx/cassandra/cassandra.go

+55-35
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package cassandra
33
import (
44
"flag"
55
"fmt"
6-
"math/rand"
76
"strings"
87
"sync"
98
"time"
@@ -80,7 +79,7 @@ var (
8079
pruneInterval time.Duration
8180
updateCassIdx bool
8281
updateInterval time.Duration
83-
updateFuzzyness float64
82+
updateInterval32 uint32
8483
)
8584

8685
func ConfigSetup() *flag.FlagSet {
@@ -95,7 +94,6 @@ func ConfigSetup() *flag.FlagSet {
9594
casIdx.IntVar(&writeQueueSize, "write-queue-size", 100000, "Max number of metricDefs allowed to be unwritten to cassandra")
9695
casIdx.BoolVar(&updateCassIdx, "update-cassandra-index", true, "synchronize index changes to cassandra. not all your nodes need to do this.")
9796
casIdx.DurationVar(&updateInterval, "update-interval", time.Hour*3, "frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates")
98-
casIdx.Float64Var(&updateFuzzyness, "update-fuzzyness", 0.5, "fuzzyness factor for update-interval. should be in the range 0 > fuzzyness <= 1. With an updateInterval of 4hours and fuzzyness of 0.5, metricDefs will be updated every 4-6hours.")
9997
casIdx.DurationVar(&maxStale, "max-stale", 0, "clear series from the index if they have not been seen for this much time.")
10098
casIdx.DurationVar(&pruneInterval, "prune-interval", time.Hour*3, "Interval at which the index should be checked for stale series.")
10199
casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use")
@@ -154,6 +152,7 @@ func New() *CasIdx {
154152
if updateCassIdx {
155153
idx.writeQueue = make(chan writeReq, writeQueueSize)
156154
}
155+
updateInterval32 = uint32(updateInterval.Nanoseconds() / int64(time.Second))
157156
return idx
158157
}
159158

@@ -236,46 +235,67 @@ func (c *CasIdx) Stop() {
236235
func (c *CasIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Archive {
237236
pre := time.Now()
238237
existing, inMemory := c.MemoryIdx.Get(data.Id)
239-
updateIdx := false
238+
archive := c.MemoryIdx.AddOrUpdate(data, partition)
240239
stat := statUpdateDuration
240+
if !inMemory {
241+
stat = statAddDuration
242+
}
243+
if !updateCassIdx {
244+
stat.Value(time.Since(pre))
245+
return archive
246+
}
241247

242-
if inMemory {
243-
if existing.Partition == partition {
244-
var oldest time.Time
245-
if updateInterval > 0 {
246-
oldest = time.Now().Add(-1 * updateInterval).Add(-1 * time.Duration(rand.Int63n(updateInterval.Nanoseconds()*int64(updateFuzzyness*100)/100)))
247-
} else {
248-
oldest = time.Now()
249-
}
250-
updateIdx = (existing.LastUpdate < oldest.Unix())
251-
} else {
252-
if updateCassIdx {
253-
// the partition of the metric has changed. So we need to delete
254-
// the current metricDef from cassandra. We do this in a separate
255-
// goroutine as we dont want to block waiting for the delete to succeed.
256-
go func() {
257-
if err := c.deleteDef(&existing); err != nil {
258-
log.Error(3, err.Error())
259-
}
260-
}()
248+
now := uint32(time.Now().Unix())
249+
250+
// Cassandra uses partition id asthe partitionin key, so an "update" that changes the partition for
251+
// an existing metricDef will just create a new row in the table and wont remove the old row.
252+
// So we need to explicitly delete the old entry.
253+
if inMemory && existing.Partition != partition {
254+
go func() {
255+
if err := c.deleteDef(&existing); err != nil {
256+
log.Error(3, err.Error())
261257
}
262-
updateIdx = true
263-
}
264-
} else {
265-
updateIdx = true
266-
stat = statAddDuration
258+
}()
267259
}
268260

269-
if updateIdx {
270-
archive := c.MemoryIdx.AddOrUpdate(data, partition)
271-
if updateCassIdx {
272-
log.Debug("cassandra-idx updating def in index.")
273-
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
274-
}
261+
// check if we need to save to cassandra.
262+
if archive.LastSave >= (now - updateInterval32) {
275263
stat.Value(time.Since(pre))
276264
return archive
277265
}
278-
return existing
266+
267+
// This is just a safety precaution to prevent corrupt index entries.
268+
// This ensures that the index entry always contains the correct metricDefinition data.
269+
if inMemory {
270+
archive.MetricDefinition = *schema.MetricDefinitionFromMetricData(data)
271+
archive.MetricDefinition.Partition = partition
272+
}
273+
274+
// if the entry has not been saved for 1.5x updateInterval
275+
// then perform a blocking save. (bit shifting to the right 1 bit, divides by 2)
276+
if archive.LastSave < (now - updateInterval32 - (updateInterval32 >> 1)) {
277+
log.Debug("cassandra-idx updating def in index.")
278+
c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}
279+
archive.LastSave = now
280+
c.MemoryIdx.Update(archive)
281+
} else {
282+
// perform a non-blocking write to the writeQueue. If the queue is full, then
283+
// this will fail and we wont update the LastSave timestamp. The next time
284+
// the metric is seen, the previous lastSave timestamp will still be in place and so
285+
// we will try and save again. This will continue until we are successful or the
286+
// lastSave timestamp become more then 1.5 x UpdateInterval, in which case we will
287+
// do a blocking write to the queue.
288+
select {
289+
case c.writeQueue <- writeReq{recvTime: time.Now(), def: &archive.MetricDefinition}:
290+
archive.LastSave = now
291+
c.MemoryIdx.Update(archive)
292+
default:
293+
log.Debug("writeQueue is full, update not saved.")
294+
}
295+
}
296+
297+
stat.Value(time.Since(pre))
298+
return archive
279299
}
280300

281301
func (c *CasIdx) rebuildIndex() {

idx/cassandra/cassandra_test.go

+150-5
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,18 @@ func init() {
2222
numConns = 1
2323
writeQueueSize = 1000
2424
protoVer = 4
25+
updateCassIdx = false
2526

2627
cluster.Init("default", "test", time.Now(), "http", 6060)
2728
}
2829

30+
func initForTests(c *CasIdx) error {
31+
if err := c.MemoryIdx.Init(); err != nil {
32+
return err
33+
}
34+
return nil
35+
}
36+
2937
func getSeriesNames(depth, count int, prefix string) []string {
3038
series := make([]string, count)
3139
for i := 0; i < count; i++ {
@@ -71,7 +79,7 @@ func getMetricData(orgId, depth, count, interval int, prefix string) []*schema.M
7179

7280
func TestGetAddKey(t *testing.T) {
7381
ix := New()
74-
ix.Init()
82+
initForTests(ix)
7583

7684
publicSeries := getMetricData(-1, 2, 5, 10, "metric.public")
7785
org1Series := getMetricData(1, 2, 5, 10, "metric.org1")
@@ -108,9 +116,104 @@ func TestGetAddKey(t *testing.T) {
108116
})
109117
}
110118

119+
func TestAddToWriteQueue(t *testing.T) {
120+
originalUpdateCassIdx := updateCassIdx
121+
originalUpdateInterval := updateInterval
122+
originalWriteQSize := writeQueueSize
123+
124+
defer func() {
125+
updateCassIdx = originalUpdateCassIdx
126+
updateInterval = originalUpdateInterval
127+
writeQueueSize = originalWriteQSize
128+
}()
129+
130+
updateCassIdx = true
131+
updateInterval = 10
132+
writeQueueSize = 5
133+
ix := New()
134+
initForTests(ix)
135+
metrics := getMetricData(1, 2, 5, 10, "metric.demo")
136+
Convey("When writeQueue is enabled", t, func() {
137+
Convey("When new metrics being added", func() {
138+
for _, s := range metrics {
139+
ix.AddOrUpdate(s, 1)
140+
select {
141+
case wr := <-ix.writeQueue:
142+
So(wr.def.Id, ShouldEqual, s.Id)
143+
archive, inMem := ix.Get(wr.def.Id)
144+
So(inMem, ShouldBeTrue)
145+
now := uint32(time.Now().Unix())
146+
So(archive.LastSave, ShouldBeBetweenOrEqual, now-1, now+1)
147+
case <-time.After(time.Second):
148+
t.Fail()
149+
}
150+
}
151+
})
152+
Convey("When existing metrics are added and lastSave is recent", func() {
153+
for _, s := range metrics {
154+
s.Time = time.Now().Unix()
155+
ix.AddOrUpdate(s, 1)
156+
}
157+
wrCount := 0
158+
159+
LOOP_WR:
160+
for {
161+
select {
162+
case <-ix.writeQueue:
163+
wrCount++
164+
default:
165+
break LOOP_WR
166+
}
167+
}
168+
So(wrCount, ShouldEqual, 0)
169+
})
170+
Convey("When existing metrics are added and lastSave is old", func() {
171+
for _, s := range metrics {
172+
s.Time = time.Now().Unix()
173+
archive, _ := ix.Get(s.Id)
174+
archive.LastSave = uint32(time.Now().Unix() - 100)
175+
ix.Update(archive)
176+
}
177+
for _, s := range metrics {
178+
ix.AddOrUpdate(s, 1)
179+
select {
180+
case wr := <-ix.writeQueue:
181+
So(wr.def.Id, ShouldEqual, s.Id)
182+
archive, inMem := ix.Get(wr.def.Id)
183+
So(inMem, ShouldBeTrue)
184+
now := uint32(time.Now().Unix())
185+
So(archive.LastSave, ShouldBeBetweenOrEqual, now-1, now+1)
186+
case <-time.After(time.Second):
187+
t.Fail()
188+
}
189+
}
190+
})
191+
Convey("When new metrics are added and writeQueue is full", func() {
192+
newMetrics := getMetricData(1, 2, 6, 10, "metric.demo2")
193+
pre := time.Now()
194+
go func() {
195+
time.Sleep(time.Second)
196+
//drain the writeQueue
197+
for range ix.writeQueue {
198+
continue
199+
}
200+
}()
201+
202+
for _, s := range newMetrics {
203+
ix.AddOrUpdate(s, 1)
204+
}
205+
//it should take at least 1 second to add the defs, as the queue will be full
206+
// until the above goroutine empties it, leading to a blocking write.
207+
So(time.Now(), ShouldHappenAfter, pre.Add(time.Second))
208+
})
209+
})
210+
ix.MemoryIdx.Stop()
211+
close(ix.writeQueue)
212+
}
213+
111214
func TestFind(t *testing.T) {
112215
ix := New()
113-
ix.Init()
216+
initForTests(ix)
114217
for _, s := range getMetricData(-1, 2, 5, 10, "metric.demo") {
115218
ix.AddOrUpdate(s, 1)
116219
}
@@ -217,7 +320,7 @@ func BenchmarkIndexing(b *testing.B) {
217320
writeQueueSize = 10
218321
protoVer = 4
219322
updateInterval = time.Hour
220-
updateFuzzyness = 1.0
323+
updateCassIdx = true
221324
ix := New()
222325
tmpSession, err := ix.cluster.CreateSession()
223326
if err != nil {
@@ -262,7 +365,7 @@ func BenchmarkLoad(b *testing.B) {
262365
writeQueueSize = 10
263366
protoVer = 4
264367
updateInterval = time.Hour
265-
updateFuzzyness = 1.0
368+
updateCassIdx = true
266369
ix := New()
267370

268371
tmpSession, err := ix.cluster.CreateSession()
@@ -275,7 +378,6 @@ func BenchmarkLoad(b *testing.B) {
275378
if err != nil {
276379
b.Skipf("can't initialize cassandra: %s", err)
277380
}
278-
279381
insertDefs(ix, b.N)
280382
ix.Stop()
281383

@@ -285,3 +387,46 @@ func BenchmarkLoad(b *testing.B) {
285387
ix.Init()
286388
ix.Stop()
287389
}
390+
391+
func BenchmarkIndexingWithUpdates(b *testing.B) {
392+
cluster.Manager.SetPartitions([]int32{1})
393+
keyspace = "metrictank"
394+
hosts = "localhost:9042"
395+
consistency = "one"
396+
timeout = time.Second
397+
numConns = 10
398+
writeQueueSize = 10
399+
protoVer = 4
400+
updateInterval = time.Hour
401+
updateCassIdx = true
402+
ix := New()
403+
tmpSession, err := ix.cluster.CreateSession()
404+
if err != nil {
405+
b.Skipf("can't connect to cassandra: %s", err)
406+
}
407+
tmpSession.Query("TRUNCATE metrictank.metric_idx").Exec()
408+
tmpSession.Close()
409+
if err != nil {
410+
b.Skipf("can't connect to cassandra: %s", err)
411+
}
412+
ix.Init()
413+
insertDefs(ix, b.N)
414+
415+
b.ReportAllocs()
416+
b.ResetTimer()
417+
var series string
418+
var data *schema.MetricData
419+
for n := 0; n < b.N; n++ {
420+
series = "some.metric." + strconv.Itoa(n)
421+
data = &schema.MetricData{
422+
Name: series,
423+
Metric: series,
424+
Interval: 10,
425+
OrgId: 1,
426+
Time: 10,
427+
}
428+
data.SetId()
429+
ix.AddOrUpdate(data, 1)
430+
}
431+
ix.Stop()
432+
}

idx/idx.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,15 @@ type Archive struct {
2626
schema.MetricDefinition
2727
SchemaId uint16 // index in mdata.schemas (not persisted)
2828
AggId uint16 // index in mdata.aggregations (not persisted)
29+
LastSave uint32 // last time the metricDefinition was saved to a backend store (cassandra)
2930
}
3031

3132
// used primarily by tests, for convenience
3233
func NewArchiveBare(name string) Archive {
3334
return Archive{
34-
schema.MetricDefinition{
35+
MetricDefinition: schema.MetricDefinition{
3536
Name: name,
3637
},
37-
0,
38-
0,
3938
}
4039
}
4140

idx/memory/memory.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ func (m *MemoryIdx) AddOrUpdate(data *schema.MetricData, partition int32) idx.Ar
120120
return archive
121121
}
122122

123+
func (m *MemoryIdx) Update(entry idx.Archive) {
124+
m.Lock()
125+
if _, ok := m.DefById[entry.Id]; !ok {
126+
m.Unlock()
127+
return
128+
}
129+
*(m.DefById[entry.Id]) = entry
130+
m.Unlock()
131+
}
132+
123133
// Used to rebuild the index from an existing set of metricDefinitions.
124134
func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int {
125135
m.Lock()
@@ -132,6 +142,9 @@ func (m *MemoryIdx) Load(defs []schema.MetricDefinition) int {
132142
continue
133143
}
134144
m.add(def)
145+
// as we are loading the metricDefs from a persistant store, set the lastSave
146+
// to the lastUpdate timestamp.
147+
m.DefById[def.Id].LastSave = uint32(def.LastUpdate)
135148
num++
136149
statMetricsActive.Inc()
137150
statAddDuration.Value(time.Since(pre))
@@ -145,9 +158,9 @@ func (m *MemoryIdx) add(def *schema.MetricDefinition) idx.Archive {
145158
schemaId, _ := mdata.MatchSchema(def.Name)
146159
aggId, _ := mdata.MatchAgg(def.Name)
147160
archive := &idx.Archive{
148-
*def,
149-
schemaId,
150-
aggId,
161+
MetricDefinition: *def,
162+
SchemaId: schemaId,
163+
AggId: aggId,
151164
}
152165

153166
//first check to see if a tree has been created for this OrgId

0 commit comments

Comments
 (0)