diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index fa0c8fc05c..30ac0cea3d 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -38,11 +38,11 @@ type AggMetric struct { Chunks []*chunk.Chunk aggregators []*Aggregator dropFirstChunk bool - firstChunkT0 uint32 ttl uint32 lastSaveStart uint32 // last chunk T0 that was added to the write Queue. lastSaveFinish uint32 // last chunk T0 successfully written to Cassandra. lastWrite uint32 + firstTs uint32 } // NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long @@ -268,27 +268,16 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) { return result, ErrNilChunk } - // The first chunk is likely only a partial chunk. If we are not the primary node - // we should not serve data from this chunk, and should instead get the chunk from cassandra. - // if we are the primary node, then there is likely no data in Cassandra anyway. - if !cluster.Manager.IsPrimary() && oldestChunk.T0 == a.firstChunkT0 { - oldestPos++ - if oldestPos >= len(a.Chunks) { - oldestPos = 0 - } - oldestChunk = a.getChunk(oldestPos) - if oldestChunk == nil { - log.Error(3, "%s", ErrNilChunk) - return result, ErrNilChunk - } - } - if to <= oldestChunk.T0 { // the requested time range ends before any data we have. if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range", a.Key) } - result.Oldest = oldestChunk.T0 + if oldestChunk.First { + result.Oldest = a.firstTs + } else { + result.Oldest = oldestChunk.T0 + } return result, nil } @@ -342,8 +331,13 @@ func (a *AggMetric) Get(from, to uint32) (Result, error) { } } + if oldestChunk.First { + result.Oldest = a.firstTs + } else { + result.Oldest = oldestChunk.T0 + } + memToIterDuration.Value(time.Now().Sub(pre)) - result.Oldest = oldestChunk.T0 return result, nil } @@ -483,12 +477,11 @@ func (a *AggMetric) add(ts uint32, val float64) { if len(a.Chunks) == 0 { chunkCreate.Inc() - // no data has been added to this metric at all. - a.Chunks = append(a.Chunks, chunk.New(t0)) - - // The first chunk is typically going to be a partial chunk - // so we keep a record of it. - a.firstChunkT0 = t0 + // no data has been added to this AggMetric yet. + // note that we may not be aware of prior data that belongs into this chunk + // so we should track this cutoff point + a.Chunks = append(a.Chunks, chunk.NewFirst(t0)) + a.firstTs = ts if err := a.Chunks[0].Push(ts, val); err != nil { panic(fmt.Sprintf("FATAL ERROR: this should never happen. Pushing initial value <%d,%f> to new chunk at pos 0 failed: %q", ts, val, err)) diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 1edf87ec06..1b3e838b56 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -77,7 +77,7 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) { } for pj = pi; c.points[pj].ts != last; pj++ { } - c.t.Logf("verifying AggMetric.Get(%d,%d) =?= %d <= ts <= %d", from, to, first, last) + c.t.Logf("verifying AggMetric.Get(%d,%d) -> range is %d - %d ?", from, to, first, last) index := pi - 1 for _, iter := range res.Iters { for iter.Next() { @@ -182,9 +182,7 @@ func TestAggMetric(t *testing.T) { c.Add(200, 200) c.Add(315, 315) c.Verify(true, 100, 399, 101, 315) - - // verify as secondary node. Data from the first chunk should not be returned. - c.Verify(false, 100, 399, 200, 315) + c.Verify(false, 100, 399, 101, 315) // get subranges c.Verify(true, 120, 299, 101, 200) diff --git a/mdata/chunk/chunk.go b/mdata/chunk/chunk.go index 00e84b8b28..828917148f 100644 --- a/mdata/chunk/chunk.go +++ b/mdata/chunk/chunk.go @@ -15,20 +15,25 @@ type Chunk struct { tsz.Series LastTs uint32 // last TS seen, not computed or anything NumPoints uint32 + First bool Closed bool } func New(t0 uint32) *Chunk { return &Chunk{ - Series: *tsz.New(t0), - LastTs: 0, - NumPoints: 0, - Closed: false, + Series: *tsz.New(t0), + } +} + +func NewFirst(t0 uint32) *Chunk { + return &Chunk{ + Series: *tsz.New(t0), + First: true, } } func (c *Chunk) String() string { - return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.Closed) + return fmt.Sprintf("", c.T0, c.LastTs, c.NumPoints, c.First, c.Closed) } func (c *Chunk) Push(t uint32, v float64) error { diff --git a/mdata/result.go b/mdata/result.go index afe148e557..2b14e789b7 100644 --- a/mdata/result.go +++ b/mdata/result.go @@ -8,5 +8,5 @@ import ( type Result struct { Points []schema.Point Iters []chunk.Iter - Oldest uint32 + Oldest uint32 // timestamp of oldest point we have, to know when and when not we may need to query slower storage }