diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 5311e4d7bb..7b5a5a3a3e 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -93,12 +93,13 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point { } else if p.Ts > t { // point is too recent, append a null and reconsider same point for next slot out[o] = schema.Point{Val: math.NaN(), Ts: t} - } else if p.Ts > t-interval && p.Ts < t { - // point is a bit older, so it's good enough, just quantize the ts, and move on to next point for next round + } else if p.Ts > t-interval { + // point is older but not by more than 1 interval, so it's good enough, just quantize the ts, and move on to next point for next round out[o] = schema.Point{Val: p.Val, Ts: t} i++ - } else if p.Ts <= t-interval { - // point is too old. advance until we find a point that is recent enough, and then go through the considerations again, + } else { + // point is too old (older by 1 interval or more). + // advance until we find a point that is recent enough, and then go through the considerations again, // if those considerations are any of the above ones. // if the last point would end up in this branch again, discard it as well. for p.Ts <= t-interval && i < len(in)-1 { @@ -197,6 +198,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se return out, nil } +// getTargetsRemote issues the requests on other nodes +// it's nothing more than a thin network wrapper around getTargetsLocal of a peer. func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { responses := make(chan getTargetsResp, len(remoteReqs)) rCtx, cancel := context.WithCancel(ctx) @@ -386,6 +389,9 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) { } } +// getSeriesFixed gets the series and makes sure the output is quantized +// (needed because the raw chunks don't contain quantized data) +// TODO: we can probably forego Fix if archive > 0 func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) { select { case <-ctx.Done(): @@ -394,6 +400,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato default: } rctx := newRequestContext(ctx, &req, consolidator) + // see newRequestContext for a detailed explanation of this. + if rctx.From == rctx.To { + return nil, nil + } res, err := s.getSeries(rctx) if err != nil { return nil, err @@ -408,6 +418,9 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato return Fix(res.Points, req.From, req.To, req.ArchInterval), nil } +// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive) +// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval +// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series. func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { res, err := s.getSeriesAggMetrics(ctx) if err != nil { @@ -431,6 +444,7 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to + // adds iters from both the cache and the store (if applicable) until := util.Min(res.Oldest, ctx.To) fromCache, err := s.getSeriesCachedStore(ctx, until) if err != nil { @@ -440,10 +454,8 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { return res, nil } -// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) -// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval -// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series. -// all data will also be quantized. +// itersToPoints converts the iters to points if they are within the from/to range +// TODO: just work on the result directly func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema.Point { pre := time.Now() @@ -481,7 +493,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) if ctx.Cons != consolidation.None { return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To) } else { - return metric.Get(ctx.From, ctx.To), nil + return metric.Get(ctx.From, ctx.To) } } @@ -500,7 +512,10 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun logLoad("cassan", ctx.AMKey, ctx.From, ctx.To) log.Debug("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until) - cacheRes := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until) + cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until) + if err != nil { + return iters, err + } log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End)) // check to see if the request has been canceled, if so abort now. @@ -531,7 +546,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun default: } - // the request cannot completely be served from cache, it will require cassandra involvement + // the request cannot completely be served from cache, it will require store involvement if !cacheRes.Complete { if cacheRes.From != cacheRes.Until { storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until) @@ -551,11 +566,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun if err != nil { // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) - tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err) + tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err) return iters, err } // it's important that the itgens get added in chronological order, - // currently we rely on cassandra returning results in order + // currently we rely on store returning results in order s.Cache.Add(ctx.AMKey, prevts, itgen) prevts = itgen.Ts iters = append(iters, *it) @@ -656,8 +671,32 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing) // to retrieve the stored data, we also use from inclusive and to exclusive, // so to make sure that the data after quantization (Fix()) is correct, we have to make the following adjustment: - // `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1) - // `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181) + // `from` 1..60 needs data 1..60 -> to assure we can read that data we adjust `from` to previous boundary+1 (here 1). (will be quantized to next boundary in this case 60) + // `to` 181..240 needs data 121..180 -> to avoid reading needless data we adjust `to` to previous boundary+1 (here 181), last ts returned must be 180 + + // except... there's a special case. let's say archinterval=60, and user requests: + // to=25, until=36 + // we know that in the graphite model there will be no data in that timeframe: + // maybe the user submitted a point with ts=30 but it will be quantized to ts=60 so it is out of range. + // but wouldn't it be nice to include it anyway? + // we can argue both ways, but the other thing is that if we apply the logic above, we end up with: + // from := 1 + // to := 1 + // which is a query not accepted by AggMetric, Ccache or store. (from must be less than to) + // the only way we can get acceptable queries is for from to be 1 and to to remain 36 (or become 60 or 61) + // such a fetch request would include the requested point + // but we know Fix() will later create the output according to these rules: + // * first point should have the first timestamp >= from that divides by interval (that would be 60 in this case) + // * last point should have the last timestamp < to that divides by interval (because to is always exclusive) (that would be 0 in this case) + // which wouldn't make sense of course. one could argue it should output one point with ts=60, + // but to do that, we have to "broaden" the `to` requested by the user, covering a larger time frame they didn't actually request. + // and we deviate from the quantizing model. + // I think we should just stick to the quantizing model + + // we can do the logic above backwards: if from and to are adjusted to the same value, such as 181, it means `from` was 181..240 and `to` was 181..240 + // which is either a nonsensical request (from > to, from == to) or from < to but such that the requested timeframe falls in between two adjacent quantized + // timestamps and could not include either of them. + // so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client. if consolidator == consolidation.None { rc.From = prevBoundary(req.From, req.ArchInterval) + 1 diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 5c0ce49b8a..48c4bb1893 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -447,17 +447,24 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk { var chunks []chunk.Chunk c := chunk.New(start) - for i := start; i < end; i++ { - c.Push(i, float64((i-start)*2)) - if (i+1)%span == 0 { - // Mark the chunk that just got finished as finished + for ts := start; ts < end; ts++ { + val := float64((ts - start) * 2) + c.Push(ts, val) + // handle the case of this being the last point for this chunk + if (ts+1)%span == 0 { c.Finish() chunks = append(chunks, *c) - if i < end { - c = chunk.New(i + 1) + // if there will be a next iteration, prepare the chunk + if ts+1 < end { + c = chunk.New(ts + 1) } } } + // if end was not quantized we have to finish the last chunk + if !c.Closed { + c.Finish() + chunks = append(chunks, *c) + } return chunks } @@ -479,10 +486,9 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk { // func TestGetSeriesCachedStore(t *testing.T) { span := uint32(600) - // save some electrons by skipping steps that are no edge cases - steps := span / 10 start := span // we want 10 chunks to serve the largest testcase + // they will have t0 600, 1200, ..., 5400, 6000 end := span * 11 chunks := generateChunks(span, start, end) @@ -493,12 +499,12 @@ func TestGetSeriesCachedStore(t *testing.T) { metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0) srv.BindMemoryStore(metrics) metric := test.GetAMKey(1) - var c *cache.CCache - var itgen *chunk.IterGen - var prevts uint32 type testcase struct { - // the pattern of chunks in store, cache or both + // the pattern of chunks + // c: in cache + // s: in store + // b: in both Pattern string // expected number of cache hits on query over all chunks @@ -521,7 +527,7 @@ func TestGetSeriesCachedStore(t *testing.T) { for _, tc := range testcases { pattern := tc.Pattern - // last ts is start ts plus the number of spans the pattern defines + // lastTs is the t0 of the first chunk that comes after the used range lastTs := start + span*uint32(len(pattern)) // we want to query through various ranges, including: @@ -529,16 +535,20 @@ func TestGetSeriesCachedStore(t *testing.T) { // - from first ts to last ts // - from last ts to last ts // and various ranges between - for from := start; from <= lastTs; from += steps { - for to := from; to <= lastTs; to += steps { - // reinstantiate the cache at the beginning of each run - c = cache.NewCCache() + // we increment from and to in tenths of a span, + // because incrementing by 1 would be needlessly expensive + step := span / 10 + for from := start; from <= lastTs; from += step { + for to := from; to <= lastTs; to += step { + // use fresh store and cache + c := cache.NewCCache() srv.BindCache(c) + store.Reset() // populate cache and store according to pattern definition - prevts = 0 + var prevts uint32 for i := 0; i < len(tc.Pattern); i++ { - itgen = chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span) + itgen := chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span) if pattern[i] == 'c' || pattern[i] == 'b' { c.Add(metric, prevts, *itgen) } @@ -554,6 +564,14 @@ func TestGetSeriesCachedStore(t *testing.T) { req.ArchInterval = 1 ctx := newRequestContext(test.NewContext(), &req, consolidation.None) iters, err := srv.getSeriesCachedStore(ctx, to) + + // test invalid query; from must be less than to + if from == to { + if err == nil { + t.Fatalf("Pattern %s From=To %d: expected err, got nil", pattern, from) + } + continue + } if err != nil { t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err) } @@ -568,7 +586,7 @@ func TestGetSeriesCachedStore(t *testing.T) { // we use the tsTracker to increase together with the iterators and compare at each step tsTracker := expectResFrom - tsSlice := make([]uint32, 0) + var tsSlice []uint32 for i, it := range iters { for it.Next() { ts, _ := it.Values() @@ -580,18 +598,14 @@ func TestGetSeriesCachedStore(t *testing.T) { } } - if to-from > 0 { - if len(tsSlice) == 0 { - t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to) - } - if tsSlice[0] != expectResFrom { - t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0]) - } - if tsSlice[len(tsSlice)-1] != expectResTo { - t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1]) - } - } else if len(tsSlice) > 0 { - t.Fatalf("Pattern %s From %d To %d; Expected results to have len 0 but got %d", pattern, from, to, len(tsSlice)) + if len(tsSlice) == 0 { + t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to) + } + if tsSlice[0] != expectResFrom { + t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0]) + } + if tsSlice[len(tsSlice)-1] != expectResTo { + t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1]) } expectedHits := uint32(0) @@ -599,55 +613,51 @@ func TestGetSeriesCachedStore(t *testing.T) { // because ranges are exclusive at the end we'll test for to - 1 exclTo := to - 1 - // if from is equal to we always expect 0 hits - if from != to { + // seek hits from beginning of the searched ranged within the given pattern + for i := 0; i < len(pattern); i++ { - // seek hits from beginning of the searched ranged within the given pattern - for i := 0; i < len(pattern); i++ { + // if pattern index is lower than from's chunk we continue + if from-(from%span) > start+uint32(i)*span { + continue + } + + // current pattern index is a cache hit, so we expect one more + if pattern[i] == 'c' || pattern[i] == 'b' { + expectedHits++ + } else { + break + } - // if pattern index is lower than from's chunk we continue - if from-(from%span) > start+uint32(i)*span { + // if we've already seeked beyond to's pattern we break and mark the seek as complete + if exclTo-(exclTo%span) == start+uint32(i)*span { + complete = true + break + } + } + + // only if the previous seek was not complete we launch one from the other end + if !complete { + + // now the same from the other end (just like the cache searching does) + for i := len(pattern) - 1; i >= 0; i-- { + + // if pattern index is above to's chunk we continue + if exclTo-(exclTo%span)+span <= start+uint32(i)*span { continue } - // current pattern index is a cache hit, so we expect one more + // current pattern index is a cache hit, so we expecte one more if pattern[i] == 'c' || pattern[i] == 'b' { expectedHits++ } else { break } - // if we've already seeked beyond to's pattern we break and mark the seek as complete - if exclTo-(exclTo%span) == start+uint32(i)*span { - complete = true + // if we've already seeked beyond from's pattern we break + if from-(from%span) == start+uint32(i)*span { break } } - - // only if the previous seek was not complete we launch one from the other end - if !complete { - - // now the same from the other end (just like the cache searching does) - for i := len(pattern) - 1; i >= 0; i-- { - - // if pattern index is above to's chunk we continue - if exclTo-(exclTo%span)+span <= start+uint32(i)*span { - continue - } - - // current pattern index is a cache hit, so we expecte one more - if pattern[i] == 'c' || pattern[i] == 'b' { - expectedHits++ - } else { - break - } - - // if we've already seeked beyond from's pattern we break - if from-(from%span) == start+uint32(i)*span { - break - } - } - } } // verify we got all cache hits we should have @@ -659,7 +669,6 @@ func TestGetSeriesCachedStore(t *testing.T) { // stop cache go routines before reinstantiating it at the top of the loop c.Stop() - store.Reset() } } } diff --git a/api/graphite.go b/api/graphite.go index db690537e9..463dd0e7ec 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -149,6 +149,7 @@ func (s *Server) findSeriesLocal(ctx context.Context, orgId uint32, patterns []s return result, nil } +// findSeriesRemote calls findSeriesLocal on a peer via http rpc func (s *Server) findSeriesRemote(ctx context.Context, orgId uint32, patterns []string, seenAfter int64, peer cluster.Node) ([]Series, error) { log.Debug("HTTP Render querying %s/index/find for %d:%q", peer.GetName(), orgId, patterns) data := models.IndexFind{ diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index dad0d51dc6..26fc72394d 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -16,6 +16,9 @@ import ( "gopkg.in/raintank/schema.v1" ) +var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to") +var ErrNilChunk = errors.New("AggMetric: unexpected nil chunk") + // AggMetric takes in new values, updates the in-memory data and streams the points to aggregators // it uses a circular buffer of chunks // each chunk starts at their respective t0 @@ -174,7 +177,7 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp if agg == nil { return Result{}, fmt.Errorf("Consolidator %q not configured", consolidator) } - return agg.Get(from, to), nil + return agg.Get(from, to) } } err := fmt.Errorf("internal error: AggMetric.GetAggregated(): unknown aggSpan %d", aggSpan) @@ -185,14 +188,17 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp // Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to // more data then what's requested may be included -// also returns oldest point we have, so that if your query needs data before it, the caller knows when to query cassandra -func (a *AggMetric) Get(from, to uint32) Result { +// specifically, returns: +// * points from the ROB (if enabled) +// * iters from matching chunks +// * oldest point we have, so that if your query needs data before it, the caller knows when to query the store +func (a *AggMetric) Get(from, to uint32) (Result, error) { pre := time.Now() if LogLevel < 2 { log.Debug("AM %s Get(): %d - %d (%s - %s) span:%ds", a.Key, from, to, TS(from), TS(to), to-from-1) } if from >= to { - panic("invalid request. to must > from") + return Result{}, ErrInvalidRange } a.RLock() defer a.RUnlock() @@ -206,7 +212,7 @@ func (a *AggMetric) Get(from, to uint32) Result { if len(result.Points) > 0 { result.Oldest = result.Points[0].Ts if result.Oldest <= from { - return result + return result, nil } } } @@ -216,7 +222,7 @@ func (a *AggMetric) Get(from, to uint32) Result { if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range.", a.Key) } - return result + return result, nil } newestChunk := a.getChunk(a.CurrentChunkPos) @@ -237,7 +243,7 @@ func (a *AggMetric) Get(from, to uint32) Result { log.Debug("AM %s Get(): no data for requested range.", a.Key) } result.Oldest = from - return result + return result, nil } // get the oldest chunk we have. @@ -258,8 +264,8 @@ func (a *AggMetric) Get(from, to uint32) Result { oldestChunk := a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } // The first chunk is likely only a partial chunk. If we are not the primary node @@ -272,8 +278,8 @@ func (a *AggMetric) Get(from, to uint32) Result { } oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -283,7 +289,7 @@ func (a *AggMetric) Get(from, to uint32) Result { log.Debug("AM %s Get(): no data for requested range", a.Key) } result.Oldest = oldestChunk.T0 - return result + return result, nil } // Find the oldest Chunk that the "from" ts falls in. If from extends before the oldest @@ -295,9 +301,9 @@ func (a *AggMetric) Get(from, to uint32) Result { } oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") result.Oldest = to - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -315,9 +321,9 @@ func (a *AggMetric) Get(from, to uint32) Result { } newestChunk = a.getChunk(newestPos) if newestChunk == nil { - log.Error(3, "unexpected nil chunk.") result.Oldest = to - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -338,7 +344,7 @@ func (a *AggMetric) Get(from, to uint32) Result { memToIterDuration.Value(time.Now().Sub(pre)) result.Oldest = oldestChunk.T0 - return result + return result, nil } // this function must only be called while holding the lock diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 3c5dddea7d..ecd116d4ca 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -64,7 +64,12 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) { currentClusterStatus := cluster.Manager.IsPrimary() sort.Sort(ByTs(c.points)) cluster.Manager.SetPrimary(primary) - res := c.agg.Get(from, to) + res, err := c.agg.Get(from, to) + + if err != nil { + c.t.Fatalf("expected err nil, got %v", err) + } + // we don't do checking or fancy logic, it is assumed that the caller made sure first and last are ts of actual points var pi int // index of first point we want var pj int // index of last point we want diff --git a/mdata/aggregator_test.go b/mdata/aggregator_test.go index 9327409879..7a5a347655 100644 --- a/mdata/aggregator_test.go +++ b/mdata/aggregator_test.go @@ -45,7 +45,12 @@ func TestAggregator(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) compare := func(key string, metric Metric, expected []schema.Point) { cluster.Manager.SetPrimary(true) - res := metric.Get(0, 1000) + res, err := metric.Get(0, 1000) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + got := make([]schema.Point, 0, len(expected)) for _, iter := range res.Iters { for iter.Next() { diff --git a/mdata/cache/cache_mock.go b/mdata/cache/cache_mock.go index 2b4222cf19..8ffd171674 100644 --- a/mdata/cache/cache_mock.go +++ b/mdata/cache/cache_mock.go @@ -46,11 +46,11 @@ func (mc *MockCache) Stop() { mc.StopCount++ } -func (mc *MockCache) Search(ctx context.Context, metric schema.AMKey, from uint32, until uint32) *CCSearchResult { +func (mc *MockCache) Search(ctx context.Context, metric schema.AMKey, from uint32, until uint32) (*CCSearchResult, error) { mc.Lock() defer mc.Unlock() mc.SearchCount++ - return nil + return nil, nil } func (mc *MockCache) DelMetric(rawMetric schema.MKey) (int, int) { diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index d441dab2d8..e37cf13576 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "flag" "runtime" "sync" @@ -17,8 +18,9 @@ import ( ) var ( - maxSize uint64 - searchFwdBug = stats.NewCounter32("recovered_errors.cache.metric.searchForwardBug") + maxSize uint64 + searchFwdBug = stats.NewCounter32("recovered_errors.cache.metric.searchForwardBug") + ErrInvalidRange = errors.New("CCache: invalid range: from must be less than to") ) func init() { @@ -197,28 +199,29 @@ func (c *CCache) evict(target *accnt.EvictTarget) { } } -func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) *CCSearchResult { +// Search looks for the requested metric and returns a complete-as-possible CCSearchResult +// from is inclusive, until is exclusive +func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) (*CCSearchResult, error) { ctx, span := tracing.NewSpan(ctx, c.tracer, "CCache.Search") defer span.Finish() - var hit chunk.IterGen - var cm *CCacheMetric - var ok bool + + if from >= until { + return nil, ErrInvalidRange + } + res := &CCSearchResult{ From: from, Until: until, } - if from == until { - return res - } - c.RLock() defer c.RUnlock() - if cm, ok = c.metricCache[metric]; !ok { + cm, ok := c.metricCache[metric] + if !ok { span.SetTag("cache", "miss") accnt.CacheMetricMiss.Inc() - return res + return res, nil } cm.Search(ctx, metric, res, from, until) @@ -229,10 +232,10 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui accnt.CacheChunkHit.Add(len(res.Start) + len(res.End)) go func() { - for _, hit = range res.Start { + for _, hit := range res.Start { c.accnt.HitChunk(metric, hit.Ts) } - for _, hit = range res.End { + for _, hit := range res.End { c.accnt.HitChunk(metric, hit.Ts) } }() @@ -246,5 +249,5 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui } } - return res + return res, nil } diff --git a/mdata/cache/ccache_metric.go b/mdata/cache/ccache_metric.go index afea1a57cd..77e7826ec6 100644 --- a/mdata/cache/ccache_metric.go +++ b/mdata/cache/ccache_metric.go @@ -16,10 +16,10 @@ import ( type CCacheMetric struct { sync.RWMutex - // points at cached data chunks, indexed by their according time stamp + // cached data chunks by timestamp chunks map[uint32]*CCacheChunk - // the list of chunk time stamps in ascending order + // chunk time stamps in ascending order keys []uint32 MKey schema.MKey @@ -243,10 +243,10 @@ func (mc *CCacheMetric) searchBackward(from, until uint32, res *CCSearchResult) } } -// Search searches for the given metric +// Search searches the CCacheMetric's data and returns a complete-as-possible CCSearchResult // -// the idea of this method is that we first look for the chunks where the -// "from" and "until" ts are in. then we seek from the "from" towards "until" +// we first look for the chunks where the "from" and "until" ts are in. +// then we seek from the "from" towards "until" // and add as many cunks as possible to the result, if this did not result // in all chunks necessary to serve the request we do the same in the reverse // order from "until" to "from" diff --git a/mdata/cache/ccache_test.go b/mdata/cache/ccache_test.go index 079f024906..68c3ed748f 100644 --- a/mdata/cache/ccache_test.go +++ b/mdata/cache/ccache_test.go @@ -222,7 +222,11 @@ func TestDisconnectedAdding(t *testing.T) { cc.Add(metric, 0, itgen2) cc.Add(metric, 0, itgen3) - res := cc.Search(test.NewContext(), metric, 900, 1015) + res, err := cc.Search(test.NewContext(), metric, 900, 1015) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") @@ -256,7 +260,11 @@ func TestDisconnectedAddingByGuessing(t *testing.T) { cc.Add(metric, 1000, itgen2) cc.Add(metric, 0, itgen3) - res := cc.Search(test.NewContext(), metric, 900, 1015) + res, err := cc.Search(test.NewContext(), metric, 900, 1015) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") @@ -292,7 +300,11 @@ func TestDisconnectedAddingByGuessing(t *testing.T) { func TestSearchFromBeginningComplete(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 1006, 1025) + res, err := cc.Search(test.NewContext(), metric, 1006, 1025) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if !res.Complete { t.Fatalf("complete is expected to be true") @@ -310,7 +322,11 @@ func TestSearchFromBeginningComplete(t *testing.T) { func TestSearchFromBeginningIncompleteEnd(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 1006, 1030) + res, err := cc.Search(test.NewContext(), metric, 1006, 1030) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") } @@ -327,7 +343,11 @@ func TestSearchFromBeginningIncompleteEnd(t *testing.T) { func TestSearchFromEnd(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 500, 1025) + res, err := cc.Search(test.NewContext(), metric, 500, 1025) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to not be true") @@ -363,8 +383,6 @@ func TestSearchDisconnectedStartEndNonSpanaware(t *testing.T) { } func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { - var cc *CCache - var res *CCSearchResult metric := test.GetAMKey(1) values := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} @@ -374,7 +392,7 @@ func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { itgen4 := getItgen(t, values, 1030, spanaware) itgen5 := getItgen(t, values, 1040, spanaware) itgen6 := getItgen(t, values, 1050, spanaware) - cc = NewCCache() + cc := NewCCache() for from := uint32(1000); from < 1010; from++ { // the end of ranges is exclusive, so we go up to 1060 @@ -397,7 +415,11 @@ func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { cc.Add(metric, 0, itgen1) } - res = cc.Search(test.NewContext(), metric, from, until) + res, err := cc.Search(test.NewContext(), metric, from, until) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if !res.Complete { t.Fatalf("from %d, until %d: complete is expected to be true", from, until) } @@ -439,8 +461,6 @@ func TestSearchDisconnectedWithGapStartEndNonSpanaware(t *testing.T) { func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bool) { metric := test.GetAMKey(1) - var cc *CCache - var res *CCSearchResult values := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} itgen1 := getItgen(t, values, 1000, spanaware) @@ -450,7 +470,7 @@ func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bo itgen4 := getItgen(t, values, 1040, spanaware) itgen5 := getItgen(t, values, 1050, spanaware) itgen6 := getItgen(t, values, 1060, spanaware) - cc = NewCCache() + cc := NewCCache() for from := uint32(1000); from < 1010; from++ { // the end of ranges is exclusive, so we go up to 1060 @@ -473,7 +493,11 @@ func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bo cc.Add(metric, 0, itgen1) } - res = cc.Search(test.NewContext(), metric, from, until) + res, err := cc.Search(test.NewContext(), metric, from, until) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("from %d, until %d: complete is expected to be false", from, until) } @@ -520,8 +544,6 @@ func TestMetricDelete(t *testing.T) { } func testMetricDelete(t *testing.T, cc *CCache) { - var res *CCSearchResult - rawMetric1 := test.GetAMKey(1) metric1_1 := schema.GetAMKey(rawMetric1.MKey, schema.Cnt, 600) metric1_2 := schema.GetAMKey(rawMetric1.MKey, schema.Sum, 600) @@ -542,19 +564,33 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if Search returns them all for metric1_1 - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err := cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } // check if Search returns them all for metric1_2 - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } // check if Search returns them all for metric2_1 - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -572,19 +608,34 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric1_2 returns no results anymore - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // but metric2_1 should still be there - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -596,7 +647,12 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // and check if it gets returned by Search again - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -614,19 +670,34 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric2_1 returns no results anymore - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } diff --git a/mdata/cache/if.go b/mdata/cache/if.go index 97d1b73afe..a01db55261 100644 --- a/mdata/cache/if.go +++ b/mdata/cache/if.go @@ -11,7 +11,7 @@ type Cache interface { Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) Stop() - Search(ctx context.Context, metric schema.AMKey, from, until uint32) *CCSearchResult + Search(ctx context.Context, metric schema.AMKey, from, until uint32) (*CCSearchResult, error) DelMetric(rawMetric schema.MKey) (int, int) Reset() (int, int) } @@ -21,22 +21,23 @@ type CachePusher interface { } type CCSearchResult struct { - // if this result is Complete == false, then the following cassandra query - // will need to use this value as from to fill in the missing data + // whether the whole request can be served from cache + Complete bool + + // if this result is not Complete, then the following store query + // will need to use this from value to fill in the missing data From uint32 - // just as with the above From, this will need to be used as the new until + // if this result is not Complete, then the following store query + // will need to use this until value to fill in the missing data Until uint32 - // if Complete is true then the whole request can be served from cache - Complete bool - // if the cache contained the chunk containing the original "from" ts then // this slice will hold it as the first element, plus all the subsequent // cached chunks. If Complete is true then all chunks are in this slice. Start []chunk.IterGen - // if complete is not true and the original "until" ts is in a cached chunk + // if this result is not Complete and the original "until" ts is in a cached chunk // then this slice will hold it as the first element, plus all the previous // ones in reverse order (because the search is seeking in reverse) End []chunk.IterGen diff --git a/mdata/cwr.go b/mdata/cwr.go index 90454f232f..0f7bbffe23 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -7,6 +7,7 @@ import ( "gopkg.in/raintank/schema.v1" ) +// ChunkWriteRequest is a request to write a chunk into a store type ChunkWriteRequest struct { Metric *AggMetric Key schema.AMKey @@ -16,6 +17,7 @@ type ChunkWriteRequest struct { Span uint32 } +// NewChunkWriteRequest creates a new ChunkWriteRequest func NewChunkWriteRequest(metric *AggMetric, key schema.AMKey, chunk *chunk.Chunk, ttl, span uint32, ts time.Time) ChunkWriteRequest { return ChunkWriteRequest{metric, key, chunk, ttl, ts, span} } diff --git a/mdata/ifaces.go b/mdata/ifaces.go index c8d012b9ea..0a33378e2e 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -17,13 +17,13 @@ type Metrics interface { type Metric interface { Add(ts uint32, val float64) - Get(from, to uint32) Result + Get(from, to uint32) (Result, error) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) } type Store interface { Add(cwr *ChunkWriteRequest) - Search(ctx context.Context, key schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error) + Search(ctx context.Context, key schema.AMKey, ttl, from, to uint32) ([]chunk.IterGen, error) Stop() SetTracer(t opentracing.Tracer) } diff --git a/stacktest/fakemetrics/out/out.go b/stacktest/fakemetrics/out/out.go index 0c8d97ce60..fcdd267c9f 100644 --- a/stacktest/fakemetrics/out/out.go +++ b/stacktest/fakemetrics/out/out.go @@ -7,11 +7,13 @@ import ( "gopkg.in/raintank/schema.v1" ) +// Out submits metricdata to a destination type Out interface { Close() error Flush(metrics []*schema.MetricData) error } +// OutStats tracks metrics related to an Output type OutStats struct { FlushDuration met.Timer // duration of Flush() PublishQueued met.Gauge // not every output uses this @@ -23,6 +25,7 @@ type OutStats struct { MessageMetrics met.Meter // number of metrics per message } +// NewStats creates a new OutStats func NewStats(stats met.Backend, output string) OutStats { return OutStats{ FlushDuration: stats.NewTimer(fmt.Sprintf("metricpublisher.out.%s.flush_duration", output), 0), diff --git a/store/cassandra/store_cassandra.go b/store/cassandra/store_cassandra.go index 91712d3300..bb23122267 100644 --- a/store/cassandra/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -33,12 +33,12 @@ const Month_sec = 60 * 60 * 24 * 28 const Table_name_format = `metric_%d` var ( - errChunkTooSmall = errors.New("unpossibly small chunk in cassandra") - errStartBeforeEnd = errors.New("start must be before end.") - errReadQueueFull = errors.New("the read queue is full") - errReadTooOld = errors.New("the read is too old") - errTableNotFound = errors.New("table for given TTL not found") - errCtxCanceled = errors.New("context canceled") + errChunkTooSmall = errors.New("impossibly small chunk in cassandra") + errInvalidRange = errors.New("CassandraStore: invalid range: from must be less than to") + errReadQueueFull = errors.New("the read queue is full") + errReadTooOld = errors.New("the read is too old") + errTableNotFound = errors.New("table for given TTL not found") + errCtxCanceled = errors.New("context canceled") // metric store.cassandra.get.exec is the duration of getting from cassandra store cassGetExecDuration = stats.NewLatencyHistogram15s32("store.cassandra.get.exec") @@ -469,10 +469,10 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl tags.PeerService.Set(span, "cassandra") itgens := make([]chunk.IterGen, 0) - if start > end { + if start >= end { tracing.Failure(span) - tracing.Error(span, errStartBeforeEnd) - return itgens, errStartBeforeEnd + tracing.Error(span, errInvalidRange) + return itgens, errInvalidRange } pre := time.Now()