Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 2029113

Browse files
committed
Get should not return first chunk when not primary node
The first chunk recorded is likely a partial chunk. When not the cluster primary dont serve the first chunk out of memory. Instead the chunk will be fetched from Cassandra.
1 parent 4aad15e commit 2029113

File tree

3 files changed

+51
-22
lines changed

3 files changed

+51
-22
lines changed

metric_tank/aggmetric.go

+20
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type AggMetric struct {
2929
aggregators []*Aggregator
3030
writeQueue chan *Chunk
3131
activeWrite bool
32+
firstChunkT0 uint32
3233
}
3334

3435
// re-order the chunks with the oldest at start of the list and newest at the end.
@@ -254,6 +255,21 @@ func (a *AggMetric) Get(from, to uint32) (uint32, []Iter) {
254255
return math.MaxInt32, make([]Iter, 0)
255256
}
256257

258+
// The first chunk is likely only a partial chunk. If we are not the primary node
259+
// we should not serve data from this chunk, and should instead get the chunk from cassandra.
260+
// if we are the primary node, then there is likely no data in Cassandra anyway.
261+
if !clusterStatus.IsPrimary() && oldestChunk.T0 == a.firstChunkT0 {
262+
oldestPos++
263+
if oldestPos >= len(a.Chunks) {
264+
oldestPos = 0
265+
}
266+
oldestChunk = a.getChunk(oldestPos)
267+
if oldestChunk == nil {
268+
log.Error(3, "unexpected nil chunk.")
269+
return math.MaxInt32, make([]Iter, 0)
270+
}
271+
}
272+
257273
if to <= oldestChunk.T0 {
258274
// the requested time range ends before any data we have.
259275
log.Debug("AggMetric %s Get(): no data for requested range", a.Key)
@@ -446,6 +462,10 @@ func (a *AggMetric) Add(ts uint32, val float64) {
446462
// no data has been added to this metric at all.
447463
a.Chunks = append(a.Chunks, NewChunk(t0))
448464

465+
// The first chunk is typically going to be a partial chunk
466+
// so we keep a record of it.
467+
a.firstChunkT0 = t0
468+
449469
if err := a.Chunks[0].Push(ts, val); err != nil {
450470
panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err))
451471
}

metric_tank/aggmetric_test.go

+28-22
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ func (c *Checker) Add(ts uint32, val float64) {
3434
// from to is the range that gets requested from AggMetric
3535
// first/last is what we use as data range to compare to (both inclusive)
3636
// these may be different because AggMetric returns broader rangers (due to packed format),
37-
func (c *Checker) Verify(from, to, first, last uint32) {
37+
func (c *Checker) Verify(primary bool, from, to, first, last uint32) {
38+
currentClusterStatus := clusterStatus.IsPrimary()
39+
clusterStatus.Set(primary)
3840
_, iters := c.agg.Get(from, to)
3941
// we don't do checking or fancy logic, it is assumed that the caller made sure first and last are ts of actual points
4042
var pi int // index of first point we want
@@ -61,6 +63,7 @@ func (c *Checker) Verify(from, to, first, last uint32) {
6163
if index != pj {
6264
c.t.Fatalf("not all values returned. missing %v", c.points[index:pj+1])
6365
}
66+
clusterStatus.Set(currentClusterStatus)
6467
}
6568

6669
func TestAggMetric(t *testing.T) {
@@ -72,52 +75,55 @@ func TestAggMetric(t *testing.T) {
7275

7376
// basic case, single range
7477
c.Add(101, 101)
75-
c.Verify(100, 200, 101, 101)
78+
c.Verify(true, 100, 200, 101, 101)
7679
c.Add(105, 105)
77-
c.Verify(100, 199, 101, 105)
80+
c.Verify(true, 100, 199, 101, 105)
7881
c.Add(115, 115)
7982
c.Add(125, 125)
8083
c.Add(135, 135)
81-
c.Verify(100, 199, 101, 135)
84+
c.Verify(true, 100, 199, 101, 135)
8285

8386
// add new ranges, aligned and unaligned
8487
c.Add(200, 200)
8588
c.Add(315, 315)
86-
c.Verify(100, 399, 101, 315)
89+
c.Verify(true, 100, 399, 101, 315)
90+
91+
// verify as secondary node. Data from the first chunk should not be returned.
92+
c.Verify(false, 100, 399, 200, 315)
8793

8894
// get subranges
89-
c.Verify(120, 299, 101, 200)
90-
c.Verify(220, 299, 200, 200)
91-
c.Verify(312, 330, 315, 315)
95+
c.Verify(true, 120, 299, 101, 200)
96+
c.Verify(true, 220, 299, 200, 200)
97+
c.Verify(true, 312, 330, 315, 315)
9298

9399
// border dancing. good for testing inclusivity and exclusivity
94-
c.Verify(100, 199, 101, 135)
95-
c.Verify(100, 200, 101, 135)
96-
c.Verify(100, 201, 101, 200)
97-
c.Verify(198, 199, 101, 135)
98-
c.Verify(199, 200, 101, 135)
99-
c.Verify(200, 201, 200, 200)
100-
c.Verify(201, 202, 200, 200)
101-
c.Verify(299, 300, 200, 200)
102-
c.Verify(300, 301, 315, 315)
100+
c.Verify(true, 100, 199, 101, 135)
101+
c.Verify(true, 100, 200, 101, 135)
102+
c.Verify(true, 100, 201, 101, 200)
103+
c.Verify(true, 198, 199, 101, 135)
104+
c.Verify(true, 199, 200, 101, 135)
105+
c.Verify(true, 200, 201, 200, 200)
106+
c.Verify(true, 201, 202, 200, 200)
107+
c.Verify(true, 299, 300, 200, 200)
108+
c.Verify(true, 300, 301, 315, 315)
103109

104110
// skipping
105111
c.Add(510, 510)
106112
c.Add(512, 512)
107-
c.Verify(100, 599, 101, 512)
113+
c.Verify(true, 100, 599, 101, 512)
108114

109115
// basic wraparound
110116
c.Add(610, 610)
111117
c.Add(612, 612)
112118
c.Add(710, 710)
113119
c.Add(712, 712)
114120
// TODO would be nice to test that it panics when requesting old range. something with recover?
115-
//c.Verify(100, 799, 101, 512)
121+
//c.Verify(true, 100, 799, 101, 512)
116122

117123
// largest range we have so far
118-
c.Verify(300, 799, 315, 712)
124+
c.Verify(true, 300, 799, 315, 712)
119125
// a smaller range
120-
c.Verify(502, 799, 510, 712)
126+
c.Verify(true, 502, 799, 510, 712)
121127

122128
// the circular buffer had these ranges:
123129
// 100 200 300 skipped 500
@@ -130,7 +136,7 @@ func TestAggMetric(t *testing.T) {
130136
// but we just check we only get this point
131137
c.Add(1299, 1299)
132138
// TODO: implement skips and enable this
133-
// c.Verify(800, 1299, 1299, 1299)
139+
// c.Verify(true, 800, 1299, 1299, 1299)
134140
}
135141

136142
// basic expected RAM usage for 1 iteration (= 1 days)

metric_tank/aggregator_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ func TestAggBoundary(t *testing.T) {
3636

3737
// note that values don't get "committed" to the metric until the aggregation interval is complete
3838
func TestAggregator(t *testing.T) {
39+
clusterStatus = NewClusterStatus("default", false)
3940
compare := func(key string, metric Metric, expected []Point) {
41+
clusterStatus.Set(true)
4042
_, iters := metric.Get(0, 1000)
4143
got := make([]Point, 0, len(expected))
4244
for _, iter := range iters {
@@ -56,6 +58,7 @@ func TestAggregator(t *testing.T) {
5658
}
5759
}
5860
}
61+
clusterStatus.Set(false)
5962
}
6063
agg := NewAggregator("test", 60, 120, 10, 10)
6164
agg.Add(100, 123.4)

0 commit comments

Comments
 (0)