Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 7115145

Browse files
authored
Merge pull request #1006 from grafana/fixChunkCacheAddRangeCorruption
Fix chunk cache add range corruption
2 parents ce63286 + b0e25d7 commit 7115145

File tree

2 files changed

+79
-0
lines changed

2 files changed

+79
-0
lines changed

mdata/cache/ccache_metric.go

+2
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ func (mc *CCacheMetric) AddRange(prev uint32, itergens []chunk.IterGen) {
185185
})
186186
mc.chunks[ts] = &chunks[len(chunks)-1]
187187
mc.keys = append(mc.keys, ts)
188+
} else {
189+
mc.chunks[ts].Prev = prev
188190
}
189191

190192
if sortKeys {

mdata/cache/ccache_metric_test.go

+77
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package cache
22

33
import (
4+
"errors"
5+
"fmt"
6+
"sort"
47
"testing"
58

9+
"github.com/grafana/metrictank/mdata/cache/accnt"
610
"github.com/grafana/metrictank/mdata/chunk"
711
"github.com/grafana/metrictank/test"
812
"github.com/raintank/schema"
@@ -187,3 +191,76 @@ func BenchmarkAddRangeDesc64(b *testing.B) {
187191
ccm.AddRange(0, chunks[i:i+64])
188192
}
189193
}
194+
195+
func TestCorruptionCase1(t *testing.T) {
196+
testRun(t, func(ccm *CCacheMetric) {
197+
chunks := generateChunks(t, 10, 6, 10)
198+
ccm.AddRange(0, chunks[3:6])
199+
ccm.AddRange(0, chunks[0:4])
200+
if err := verifyCcm(ccm); err != nil {
201+
t.Fatal(err)
202+
}
203+
})
204+
}
205+
206+
// verifyCcm verifies the integrity of a CCacheMetric
207+
// it assumes that all itergens are span-aware
208+
func verifyCcm(ccm *CCacheMetric) error {
209+
var chunk *CCacheChunk
210+
var ok bool
211+
212+
if len(ccm.chunks) != len(ccm.keys) {
213+
return errors.New("Length of ccm.chunks does not match ccm.keys")
214+
}
215+
216+
if !sort.IsSorted(accnt.Uint32Asc(ccm.keys)) {
217+
return errors.New("keys are not sorted")
218+
}
219+
220+
for i, ts := range ccm.keys {
221+
if chunk, ok = ccm.chunks[ts]; !ok {
222+
return fmt.Errorf("Ts %d is in ccm.keys but not in ccm.chunks", ts)
223+
}
224+
225+
if i == 0 {
226+
if chunk.Prev != 0 {
227+
return errors.New("First chunk has Prev != 0")
228+
}
229+
} else {
230+
if chunk.Prev == 0 {
231+
if ccm.chunks[ccm.keys[i-1]].Ts == chunk.Ts-chunk.Itgen.Span {
232+
return fmt.Errorf("Chunk of ts %d has Prev == 0, but the previous chunk is present", ts)
233+
}
234+
} else {
235+
if ccm.chunks[ccm.keys[i-1]].Ts != chunk.Prev {
236+
return fmt.Errorf("Chunk of ts %d has Prev set to wrong ts %d but should be %d", ts, chunk.Prev, ccm.chunks[ccm.keys[i-1]].Ts)
237+
}
238+
}
239+
}
240+
241+
if i == len(ccm.keys)-1 {
242+
if chunk.Next != 0 {
243+
return fmt.Errorf("Next of last chunk should be 0, but it's %d", chunk.Next)
244+
}
245+
246+
// all checks completed
247+
break
248+
}
249+
250+
var nextChunk *CCacheChunk
251+
if nextChunk, ok = ccm.chunks[ccm.keys[i+1]]; !ok {
252+
return fmt.Errorf("Ts %d is in ccm.keys but not in ccm.chunks", ccm.keys[i+1])
253+
}
254+
255+
if chunk.Next == 0 {
256+
if chunk.Ts+chunk.Itgen.Span == nextChunk.Ts {
257+
return fmt.Errorf("Next of chunk at ts %d is set to 0, but the next chunk is present", ts)
258+
}
259+
} else {
260+
if chunk.Next != nextChunk.Ts {
261+
return fmt.Errorf("Next of chunk at ts %d is set to %d, but it should be %d", ts, chunk.Next, nextChunk.Ts)
262+
}
263+
}
264+
}
265+
return nil
266+
}

0 commit comments

Comments
 (0)