Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit b6711ab

Browse files
authored
Merge pull request #994 from grafana/read-from-first-chunk
read from first chunks
2 parents 6933255 + b99a0fd commit b6711ab

File tree

4 files changed

+30
-34
lines changed

4 files changed

+30
-34
lines changed

mdata/aggmetric.go

+17-24
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ type AggMetric struct {
3838
Chunks []*chunk.Chunk
3939
aggregators []*Aggregator
4040
dropFirstChunk bool
41-
firstChunkT0 uint32
4241
ttl uint32
4342
lastSaveStart uint32 // last chunk T0 that was added to the write Queue.
4443
lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra.
4544
lastWrite uint32
45+
firstTs uint32
4646
}
4747

4848
// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
@@ -268,27 +268,16 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
268268
return result, ErrNilChunk
269269
}
270270

271-
// The first chunk is likely only a partial chunk. If we are not the primary node
272-
// we should not serve data from this chunk, and should instead get the chunk from cassandra.
273-
// if we are the primary node, then there is likely no data in Cassandra anyway.
274-
if !cluster.Manager.IsPrimary() && oldestChunk.T0 == a.firstChunkT0 {
275-
oldestPos++
276-
if oldestPos >= len(a.Chunks) {
277-
oldestPos = 0
278-
}
279-
oldestChunk = a.getChunk(oldestPos)
280-
if oldestChunk == nil {
281-
log.Error(3, "%s", ErrNilChunk)
282-
return result, ErrNilChunk
283-
}
284-
}
285-
286271
if to <= oldestChunk.T0 {
287272
// the requested time range ends before any data we have.
288273
if LogLevel < 2 {
289274
log.Debug("AM %s Get(): no data for requested range", a.Key)
290275
}
291-
result.Oldest = oldestChunk.T0
276+
if oldestChunk.First {
277+
result.Oldest = a.firstTs
278+
} else {
279+
result.Oldest = oldestChunk.T0
280+
}
292281
return result, nil
293282
}
294283

@@ -342,8 +331,13 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) {
342331
}
343332
}
344333

334+
if oldestChunk.First {
335+
result.Oldest = a.firstTs
336+
} else {
337+
result.Oldest = oldestChunk.T0
338+
}
339+
345340
memToIterDuration.Value(time.Now().Sub(pre))
346-
result.Oldest = oldestChunk.T0
347341
return result, nil
348342
}
349343

@@ -483,12 +477,11 @@ func (a *AggMetric) add(ts uint32, val float64) {
483477

484478
if len(a.Chunks) == 0 {
485479
chunkCreate.Inc()
486-
// no data has been added to this metric at all.
487-
a.Chunks = append(a.Chunks, chunk.New(t0))
488-
489-
// The first chunk is typically going to be a partial chunk
490-
// so we keep a record of it.
491-
a.firstChunkT0 = t0
480+
// no data has been added to this AggMetric yet.
481+
// note that we may not be aware of prior data that belongs into this chunk
482+
// so we should track this cutoff point
483+
a.Chunks = append(a.Chunks, chunk.NewFirst(t0))
484+
a.firstTs = ts
492485

493486
if err := a.Chunks[0].Push(ts, val); err != nil {
494487
panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err))

mdata/aggmetric_test.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) {
7777
}
7878
for pj = pi; c.points[pj].ts != last; pj++ {
7979
}
80-
c.t.Logf("verifying AggMetric.Get(%d,%d) =?= %d <= ts <= %d", from, to, first, last)
80+
c.t.Logf("verifying AggMetric.Get(%d,%d) -> range is %d - %d ?", from, to, first, last)
8181
index := pi - 1
8282
for _, iter := range res.Iters {
8383
for iter.Next() {
@@ -182,9 +182,7 @@ func TestAggMetric(t *testing.T) {
182182
c.Add(200, 200)
183183
c.Add(315, 315)
184184
c.Verify(true, 100, 399, 101, 315)
185-
186-
// verify as secondary node. Data from the first chunk should not be returned.
187-
c.Verify(false, 100, 399, 200, 315)
185+
c.Verify(false, 100, 399, 101, 315)
188186

189187
// get subranges
190188
c.Verify(true, 120, 299, 101, 200)

mdata/chunk/chunk.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,25 @@ type Chunk struct {
1515
tsz.Series
1616
LastTs uint32 // last TS seen, not computed or anything
1717
NumPoints uint32
18+
First bool
1819
Closed bool
1920
}
2021

2122
func New(t0 uint32) *Chunk {
2223
return &Chunk{
23-
Series: *tsz.New(t0),
24-
LastTs: 0,
25-
NumPoints: 0,
26-
Closed: false,
24+
Series: *tsz.New(t0),
25+
}
26+
}
27+
28+
func NewFirst(t0 uint32) *Chunk {
29+
return &Chunk{
30+
Series: *tsz.New(t0),
31+
First: true,
2732
}
2833
}
2934

3035
func (c *Chunk) String() string {
31-
return fmt.Sprintf("<chunk T0=%d, LastTs=%d, NumPoints=%d, Closed=%t>", c.T0, c.LastTs, c.NumPoints, c.Closed)
36+
return fmt.Sprintf("<chunk T0=%d, LastTs=%d, NumPoints=%d, First=%t, Closed=%t>", c.T0, c.LastTs, c.NumPoints, c.First, c.Closed)
3237

3338
}
3439
func (c *Chunk) Push(t uint32, v float64) error {

mdata/result.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ import (
88
type Result struct {
99
Points []schema.Point
1010
Iters []chunk.Iter
11-
Oldest uint32
11+
Oldest uint32 // timestamp of oldest point we have, to know when and when not we may need to query slower storage
1212
}

0 commit comments

Comments
 (0)