Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 2d12b85

Browse files
committed
read from first chunks
this is an old optimization (?) that has been with us since a long time ago: #74 2029113 here's how it caused data loss at read time: - when only 1 chunk of data had been filled: the "update" of the field is a no-op because len(chunks) == 1, so oldPos goes back to 0 (not sure if intentional or a bug) so reading the first chunk worked. - once you have more than 1 chunk: update of oldPos works. we start hitting cassandra. depending on how long the chunk takes to get saved to cassandra, we will miss data at read time. also, our chunk cache does not cache absence of data, hitting cassandra harder during this period. - once the chunk is saved to cassandra the problem disappears - once the circular buffer recycles the first time (effectively removing the first chunk) this optimization no longer applies, but at that point we still hit cassandra just as before. This problem is now solved. However, removing that code enables another avenue for data loss at read time: - when a read node starts (without data backfill) - or a read node starts with data backfill, but the backfill doesn't have old data for the particular metric, IOW when the data only covers 1 chunk's worth - a read node starts with data backfill, but since backfilling starts at arbitrary positions, the first chunk will miss some data in the beginning. In all these cases, the first chunk is a partial chunk, whereas a full version of the chunk is most likely already in cassandra. To make sure this is not a problem, if the first chunk we used was partial, we set oldest to the first timestamp, so that the rest can be retrieved from cassandra. Typically, this will cause the "same" chunk (but a full version) to be retrieved from cassandra, which is then cached and seamlessly merged via Fix()
1 parent 34febb0 commit 2d12b85

File tree

3 files changed

+28
-30
lines changed

3 files changed

+28
-30
lines changed

mdata/aggmetric.go

+17-24
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ type AggMetric struct {
3838
Chunks []*chunk.Chunk
3939
aggregators []*Aggregator
4040
dropFirstChunk bool
41-
firstChunkT0 uint32
4241
ttl uint32
4342
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
4443
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
4544
lastWrite uint32
45+
firstTs uint32
4646
}
4747

4848
// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
@@ -268,27 +268,16 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
268268
return result, ErrNilChunk
269269
}
270270

271-
// The first chunk is likely only a partial chunk. If we are not the primary node
272-
// we should not serve data from this chunk, and should instead get the chunk from cassandra.
273-
// if we are the primary node, then there is likely no data in Cassandra anyway.
274-
if !cluster.Manager.IsPrimary() && oldestChunk.T0 == a.firstChunkT0 {
275-
oldestPos++
276-
if oldestPos >= len(a.Chunks) {
277-
oldestPos = 0
278-
}
279-
oldestChunk = a.getChunk(oldestPos)
280-
if oldestChunk == nil {
281-
log.Error(3, "%s", ErrNilChunk)
282-
return result, ErrNilChunk
283-
}
284-
}
285-
286271
if to <= oldestChunk.T0 {
287272
// the requested time range ends before any data we have.
288273
if LogLevel < 2 {
289274
log.Debug("AM %s Get(): no data for requested range", a.Key)
290275
}
291-
result.Oldest = oldestChunk.T0
276+
if oldestChunk.First {
277+
result.Oldest = a.firstTs
278+
} else {
279+
result.Oldest = oldestChunk.T0
280+
}
292281
return result, nil
293282
}
294283

@@ -342,8 +331,13 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
342331
}
343332
}
344333

334+
if oldestChunk.First {
335+
result.Oldest = a.firstTs
336+
} else {
337+
result.Oldest = oldestChunk.T0
338+
}
339+
345340
memToIterDuration.Value(time.Now().Sub(pre))
346-
result.Oldest = oldestChunk.T0
347341
return result, nil
348342
}
349343

@@ -483,12 +477,11 @@ func (a *AggMetric) add(ts uint32, val float64) {
483477

484478
if len(a.Chunks) == 0 {
485479
chunkCreate.Inc()
486-
// no data has been added to this metric at all.
487-
a.Chunks = append(a.Chunks, chunk.New(t0))
488-
489-
// The first chunk is typically going to be a partial chunk
490-
// so we keep a record of it.
491-
a.firstChunkT0 = t0
480+
// no data has been added to this AggMetric yet.
481+
// note that we may not be aware of prior data that belongs into this chunk
482+
// so we should track this cutoff point
483+
a.Chunks = append(a.Chunks, chunk.NewFirst(t0))
484+
a.firstTs = ts
492485

493486
if err := a.Chunks[0].Push(ts, val); err != nil {
494487
panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err))

mdata/chunk/chunk.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ type Chunk struct {
1515
tsz.Series
1616
LastTs uint32 // last TS seen, not computed or anything
1717
NumPoints uint32
18+
First bool
1819
Closed bool
1920
}
2021

2122
func New(t0 uint32) *Chunk {
2223
return &Chunk{
23-
Series: *tsz.New(t0),
24-
LastTs: 0,
25-
NumPoints: 0,
26-
Closed: false,
24+
Series: *tsz.New(t0),
25+
}
26+
}
27+
28+
func NewFirst(t0 uint32) *Chunk {
29+
return &Chunk{
30+
Series: *tsz.New(t0),
31+
First: true,
2732
}
2833
}
2934

3035
func (c *Chunk) String() string {
31-
return fmt.Sprintf("<chunk T0=%d, LastTs=%d, NumPoints=%d, Closed=%t>", c.T0, c.LastTs, c.NumPoints, c.Closed)
36+
return fmt.Sprintf("<chunk T0=%d, LastTs=%d, NumPoints=%d, First=%t, Closed=%t>", c.T0, c.LastTs, c.NumPoints, c.First, c.Closed)
3237

3338
}
3439
func (c *Chunk) Push(t uint32, v float64) error {

mdata/result.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import (
88
type Result struct {
99
Points []schema.Point
1010
Iters []chunk.Iter
11-
Oldest uint32
11+
Oldest uint32 // timestamp of oldest point we have, to know when and when not we may need to query slower storage
1212
}

0 commit comments

Comments
 (0)