From d0c0066fff4f39c89eb0cd8bcc67464086e896cf Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 14:07:11 +0200 Subject: [PATCH 01/10] Fix(): remove redundant checks, clarify --- api/dataprocessor.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 5311e4d7bb..7bc9a9ad20 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -93,12 +93,13 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point { } else if p.Ts > t { // point is too recent, append a null and reconsider same point for next slot out[o] = schema.Point{Val: math.NaN(), Ts: t} - } else if p.Ts > t-interval && p.Ts < t { - // point is a bit older, so it's good enough, just quantize the ts, and move on to next point for next round + } else if p.Ts > t-interval { + // point is older but not by more than 1 interval, so it's good enough, just quantize the ts, and move on to next point for next round out[o] = schema.Point{Val: p.Val, Ts: t} i++ - } else if p.Ts <= t-interval { - // point is too old. advance until we find a point that is recent enough, and then go through the considerations again, + } else { + // point is too old (older by 1 interval or more). + // advance until we find a point that is recent enough, and then go through the considerations again, // if those considerations are any of the above ones. // if the last point would end up in this branch again, discard it as well. for p.Ts <= t-interval && i < len(in)-1 { From 0dcd31b0ea496428322e643092748b7943d4defa Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 14:10:12 +0200 Subject: [PATCH 02/10] better comments --- mdata/aggmetric.go | 5 ++++- mdata/cache/ccache.go | 1 + mdata/cache/ccache_metric.go | 10 +++++----- mdata/cache/if.go | 15 ++++++++------- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index dad0d51dc6..7145adf0c2 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -185,7 +185,10 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp // Get all data between the requested time ranges. From is inclusive, to is exclusive. from <= x < to // more data then what's requested may be included -// also returns oldest point we have, so that if your query needs data before it, the caller knows when to query cassandra +// specifically, returns: +// * points from the ROB (if enabled) +// * iters from matching chunks +// * oldest point we have, so that if your query needs data before it, the caller knows when to query the store func (a *AggMetric) Get(from, to uint32) Result { pre := time.Now() if LogLevel < 2 { diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index d441dab2d8..157001c1ef 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -197,6 +197,7 @@ func (c *CCache) evict(target *accnt.EvictTarget) { } } +// Search looks for the requested metric and returns a complete-as-possible CCSearchResult func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) *CCSearchResult { ctx, span := tracing.NewSpan(ctx, c.tracer, "CCache.Search") defer span.Finish() diff --git a/mdata/cache/ccache_metric.go b/mdata/cache/ccache_metric.go index afea1a57cd..77e7826ec6 100644 --- a/mdata/cache/ccache_metric.go +++ b/mdata/cache/ccache_metric.go @@ -16,10 +16,10 @@ import ( type CCacheMetric struct { sync.RWMutex - // points at cached data chunks, indexed by their according time stamp + // cached data chunks by timestamp chunks map[uint32]*CCacheChunk - // the list of chunk time stamps in ascending order + // chunk time stamps in ascending order keys []uint32 MKey schema.MKey @@ -243,10 +243,10 @@ func (mc *CCacheMetric) searchBackward(from, until uint32, res *CCSearchResult) } } -// Search searches for the given metric +// Search searches the CCacheMetric's data and returns a complete-as-possible CCSearchResult // -// the idea of this method is that we first look for the chunks where the -// "from" and "until" ts are in. then we seek from the "from" towards "until" +// we first look for the chunks where the "from" and "until" ts are in. +// then we seek from the "from" towards "until" // and add as many cunks as possible to the result, if this did not result // in all chunks necessary to serve the request we do the same in the reverse // order from "until" to "from" diff --git a/mdata/cache/if.go b/mdata/cache/if.go index 97d1b73afe..d47168d4ac 100644 --- a/mdata/cache/if.go +++ b/mdata/cache/if.go @@ -21,22 +21,23 @@ type CachePusher interface { } type CCSearchResult struct { - // if this result is Complete == false, then the following cassandra query - // will need to use this value as from to fill in the missing data + // whether the whole request can be served from cache + Complete bool + + // if this result is not Complete, then the following store query + // will need to use this from value to fill in the missing data From uint32 - // just as with the above From, this will need to be used as the new until + // if this result is not Complete, then the following store query + // will need to use this until value to fill in the missing data Until uint32 - // if Complete is true then the whole request can be served from cache - Complete bool - // if the cache contained the chunk containing the original "from" ts then // this slice will hold it as the first element, plus all the subsequent // cached chunks. If Complete is true then all chunks are in this slice. Start []chunk.IterGen - // if complete is not true and the original "until" ts is in a cached chunk + // if this result is not Complete and the original "until" ts is in a cached chunk // then this slice will hold it as the first element, plus all the previous // ones in reverse order (because the search is seeking in reverse) End []chunk.IterGen From ce9c4fc94b6f74340eb062a96d9612d904243753 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Fri, 18 May 2018 10:46:23 +0200 Subject: [PATCH 03/10] fix generateChunks it happened to work because the only caller used an end that divides by the step. But now it works in all cases --- api/dataprocessor_test.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 5c0ce49b8a..0b78344ad0 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -447,17 +447,24 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk { var chunks []chunk.Chunk c := chunk.New(start) - for i := start; i < end; i++ { - c.Push(i, float64((i-start)*2)) - if (i+1)%span == 0 { - // Mark the chunk that just got finished as finished + for ts := start; ts < end; ts++ { + val := float64((ts - start) * 2) + c.Push(ts, val) + // handle the case of this being the last point for this chunk + if (ts+1)%span == 0 { c.Finish() chunks = append(chunks, *c) - if i < end { - c = chunk.New(i + 1) + // if there will be a next iteration, prepare the chunk + if ts+1 < end { + c = chunk.New(ts + 1) } } } + // if end was not quantized we have to finish the last chunk + if !c.Closed { + c.Finish() + chunks = append(chunks, *c) + } return chunks } From 586fc11ba09a96497bd3af5e3e219abb243900a6 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Fri, 18 May 2018 10:47:53 +0200 Subject: [PATCH 04/10] make TestGetSeriesCachedStore more comprehensible --- api/dataprocessor_test.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 0b78344ad0..3136c76c19 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -486,10 +486,9 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk { // func TestGetSeriesCachedStore(t *testing.T) { span := uint32(600) - // save some electrons by skipping steps that are no edge cases - steps := span / 10 start := span // we want 10 chunks to serve the largest testcase + // they will have t0 600, 1200, ..., 5400, 6000 end := span * 11 chunks := generateChunks(span, start, end) @@ -505,7 +504,10 @@ func TestGetSeriesCachedStore(t *testing.T) { var prevts uint32 type testcase struct { - // the pattern of chunks in store, cache or both + // the pattern of chunks + // c: in cache + // s: in store + // b: in both Pattern string // expected number of cache hits on query over all chunks @@ -528,7 +530,7 @@ func TestGetSeriesCachedStore(t *testing.T) { for _, tc := range testcases { pattern := tc.Pattern - // last ts is start ts plus the number of spans the pattern defines + // lastTs is the t0 of the first chunk that comes after the used range lastTs := start + span*uint32(len(pattern)) // we want to query through various ranges, including: @@ -536,11 +538,15 @@ func TestGetSeriesCachedStore(t *testing.T) { // - from first ts to last ts // - from last ts to last ts // and various ranges between - for from := start; from <= lastTs; from += steps { - for to := from; to <= lastTs; to += steps { - // reinstantiate the cache at the beginning of each run + // we increment from and to in tenths of a span, + // because incrementing by 1 would be needlessly expensive + step := span / 10 + for from := start; from <= lastTs; from += step { + for to := from; to <= lastTs; to += step { + // use fresh store and cache c = cache.NewCCache() srv.BindCache(c) + store.Reset() // populate cache and store according to pattern definition prevts = 0 @@ -666,7 +672,6 @@ func TestGetSeriesCachedStore(t *testing.T) { // stop cache go routines before reinstantiating it at the top of the loop c.Stop() - store.Reset() } } } From 9a26a612d9367cb5008d75903b71fccd2b45847a Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 14:49:43 +0200 Subject: [PATCH 05/10] cleanup from-to range error handling AggMetric, Store, and Cache all support retrieving a range of data. ranges are defined as from to to; where from is inclusive and to is exclusive. thus valid ranges must have from < to; with the special case of from = to - 1, which selects exactly 1 1-second slot. in particular: * use uniform error messages across all 3 * AggMetric.Get(): when hitting unexpected nil chunk, return error. if we have corruption in our in-memory chunklist, i rather return an error to client rather than trying to query the store to prevent cascading trouble. Seems pretty rare considering chunkcache does most of the work, but the other reason is now that we return errors instead of panicing, this seems cleaner * in cache.Search treat from > to as an error --- api/dataprocessor.go | 7 +- api/dataprocessor_test.go | 94 +++++++++++----------- mdata/aggmetric.go | 35 +++++---- mdata/aggmetric_test.go | 7 +- mdata/aggregator_test.go | 7 +- mdata/cache/cache_mock.go | 4 +- mdata/cache/ccache.go | 17 ++-- mdata/cache/ccache_test.go | 121 +++++++++++++++++++++++------ mdata/cache/if.go | 2 +- mdata/ifaces.go | 4 +- store/cassandra/store_cassandra.go | 18 ++--- 11 files changed, 203 insertions(+), 113 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 7bc9a9ad20..41af4c6c1e 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -482,7 +482,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) if ctx.Cons != consolidation.None { return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To) } else { - return metric.Get(ctx.From, ctx.To), nil + return metric.Get(ctx.From, ctx.To) } } @@ -501,7 +501,10 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun logLoad("cassan", ctx.AMKey, ctx.From, ctx.To) log.Debug("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until) - cacheRes := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until) + cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until) + if err != nil { + return iters, err + } log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End)) // check to see if the request has been canceled, if so abort now. diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index 3136c76c19..cd0a40c2f0 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -567,6 +567,14 @@ func TestGetSeriesCachedStore(t *testing.T) { req.ArchInterval = 1 ctx := newRequestContext(test.NewContext(), &req, consolidation.None) iters, err := srv.getSeriesCachedStore(ctx, to) + + // test invalid query; from must be less than to + if from == to { + if err == nil { + t.Fatalf("Pattern %s From=To %d: expected err, got nil", pattern, from) + } + continue + } if err != nil { t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err) } @@ -593,18 +601,14 @@ func TestGetSeriesCachedStore(t *testing.T) { } } - if to-from > 0 { - if len(tsSlice) == 0 { - t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to) - } - if tsSlice[0] != expectResFrom { - t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0]) - } - if tsSlice[len(tsSlice)-1] != expectResTo { - t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1]) - } - } else if len(tsSlice) > 0 { - t.Fatalf("Pattern %s From %d To %d; Expected results to have len 0 but got %d", pattern, from, to, len(tsSlice)) + if len(tsSlice) == 0 { + t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to) + } + if tsSlice[0] != expectResFrom { + t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0]) + } + if tsSlice[len(tsSlice)-1] != expectResTo { + t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1]) } expectedHits := uint32(0) @@ -612,55 +616,51 @@ func TestGetSeriesCachedStore(t *testing.T) { // because ranges are exclusive at the end we'll test for to - 1 exclTo := to - 1 - // if from is equal to we always expect 0 hits - if from != to { + // seek hits from beginning of the searched ranged within the given pattern + for i := 0; i < len(pattern); i++ { + + // if pattern index is lower than from's chunk we continue + if from-(from%span) > start+uint32(i)*span { + continue + } + + // current pattern index is a cache hit, so we expect one more + if pattern[i] == 'c' || pattern[i] == 'b' { + expectedHits++ + } else { + break + } + + // if we've already seeked beyond to's pattern we break and mark the seek as complete + if exclTo-(exclTo%span) == start+uint32(i)*span { + complete = true + break + } + } + + // only if the previous seek was not complete we launch one from the other end + if !complete { - // seek hits from beginning of the searched ranged within the given pattern - for i := 0; i < len(pattern); i++ { + // now the same from the other end (just like the cache searching does) + for i := len(pattern) - 1; i >= 0; i-- { - // if pattern index is lower than from's chunk we continue - if from-(from%span) > start+uint32(i)*span { + // if pattern index is above to's chunk we continue + if exclTo-(exclTo%span)+span <= start+uint32(i)*span { continue } - // current pattern index is a cache hit, so we expect one more + // current pattern index is a cache hit, so we expecte one more if pattern[i] == 'c' || pattern[i] == 'b' { expectedHits++ } else { break } - // if we've already seeked beyond to's pattern we break and mark the seek as complete - if exclTo-(exclTo%span) == start+uint32(i)*span { - complete = true + // if we've already seeked beyond from's pattern we break + if from-(from%span) == start+uint32(i)*span { break } } - - // only if the previous seek was not complete we launch one from the other end - if !complete { - - // now the same from the other end (just like the cache searching does) - for i := len(pattern) - 1; i >= 0; i-- { - - // if pattern index is above to's chunk we continue - if exclTo-(exclTo%span)+span <= start+uint32(i)*span { - continue - } - - // current pattern index is a cache hit, so we expecte one more - if pattern[i] == 'c' || pattern[i] == 'b' { - expectedHits++ - } else { - break - } - - // if we've already seeked beyond from's pattern we break - if from-(from%span) == start+uint32(i)*span { - break - } - } - } } // verify we got all cache hits we should have diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index 7145adf0c2..a7ccbbb2f6 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -16,6 +16,9 @@ import ( "gopkg.in/raintank/schema.v1" ) +var ErrInvalidRange = errors.New("AggMetric: invalid range. from must < to") +var ErrNilChunk = errors.New("AggMetric: unexpected nil chunk") + // AggMetric takes in new values, updates the in-memory data and streams the points to aggregators // it uses a circular buffer of chunks // each chunk starts at their respective t0 @@ -174,7 +177,7 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp if agg == nil { return Result{}, fmt.Errorf("Consolidator %q not configured", consolidator) } - return agg.Get(from, to), nil + return agg.Get(from, to) } } err := fmt.Errorf("internal error: AggMetric.GetAggregated(): unknown aggSpan %d", aggSpan) @@ -189,13 +192,13 @@ func (a *AggMetric) GetAggregated(consolidator consolidation.Consolidator, aggSp // * points from the ROB (if enabled) // * iters from matching chunks // * oldest point we have, so that if your query needs data before it, the caller knows when to query the store -func (a *AggMetric) Get(from, to uint32) Result { +func (a *AggMetric) Get(from, to uint32) (Result, error) { pre := time.Now() if LogLevel < 2 { log.Debug("AM %s Get(): %d - %d (%s - %s) span:%ds", a.Key, from, to, TS(from), TS(to), to-from-1) } if from >= to { - panic("invalid request. to must > from") + return Result{}, ErrInvalidRange } a.RLock() defer a.RUnlock() @@ -209,7 +212,7 @@ func (a *AggMetric) Get(from, to uint32) Result { if len(result.Points) > 0 { result.Oldest = result.Points[0].Ts if result.Oldest <= from { - return result + return result, nil } } } @@ -219,7 +222,7 @@ func (a *AggMetric) Get(from, to uint32) Result { if LogLevel < 2 { log.Debug("AM %s Get(): no data for requested range.", a.Key) } - return result + return result, nil } newestChunk := a.getChunk(a.CurrentChunkPos) @@ -240,7 +243,7 @@ func (a *AggMetric) Get(from, to uint32) Result { log.Debug("AM %s Get(): no data for requested range.", a.Key) } result.Oldest = from - return result + return result, nil } // get the oldest chunk we have. @@ -261,8 +264,8 @@ func (a *AggMetric) Get(from, to uint32) Result { oldestChunk := a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } // The first chunk is likely only a partial chunk. If we are not the primary node @@ -275,8 +278,8 @@ func (a *AggMetric) Get(from, to uint32) Result { } oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -286,7 +289,7 @@ func (a *AggMetric) Get(from, to uint32) Result { log.Debug("AM %s Get(): no data for requested range", a.Key) } result.Oldest = oldestChunk.T0 - return result + return result, nil } // Find the oldest Chunk that the "from" ts falls in. If from extends before the oldest @@ -298,9 +301,9 @@ func (a *AggMetric) Get(from, to uint32) Result { } oldestChunk = a.getChunk(oldestPos) if oldestChunk == nil { - log.Error(3, "unexpected nil chunk.") result.Oldest = to - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -318,9 +321,9 @@ func (a *AggMetric) Get(from, to uint32) Result { } newestChunk = a.getChunk(newestPos) if newestChunk == nil { - log.Error(3, "unexpected nil chunk.") result.Oldest = to - return result + log.Error(3, "%s", ErrNilChunk) + return result, ErrNilChunk } } @@ -341,7 +344,7 @@ func (a *AggMetric) Get(from, to uint32) Result { memToIterDuration.Value(time.Now().Sub(pre)) result.Oldest = oldestChunk.T0 - return result + return result, nil } // this function must only be called while holding the lock diff --git a/mdata/aggmetric_test.go b/mdata/aggmetric_test.go index 3c5dddea7d..ecd116d4ca 100644 --- a/mdata/aggmetric_test.go +++ b/mdata/aggmetric_test.go @@ -64,7 +64,12 @@ func (c *Checker) Verify(primary bool, from, to, first, last uint32) { currentClusterStatus := cluster.Manager.IsPrimary() sort.Sort(ByTs(c.points)) cluster.Manager.SetPrimary(primary) - res := c.agg.Get(from, to) + res, err := c.agg.Get(from, to) + + if err != nil { + c.t.Fatalf("expected err nil, got %v", err) + } + // we don't do checking or fancy logic, it is assumed that the caller made sure first and last are ts of actual points var pi int // index of first point we want var pj int // index of last point we want diff --git a/mdata/aggregator_test.go b/mdata/aggregator_test.go index 9327409879..7a5a347655 100644 --- a/mdata/aggregator_test.go +++ b/mdata/aggregator_test.go @@ -45,7 +45,12 @@ func TestAggregator(t *testing.T) { cluster.Init("default", "test", time.Now(), "http", 6060) compare := func(key string, metric Metric, expected []schema.Point) { cluster.Manager.SetPrimary(true) - res := metric.Get(0, 1000) + res, err := metric.Get(0, 1000) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + got := make([]schema.Point, 0, len(expected)) for _, iter := range res.Iters { for iter.Next() { diff --git a/mdata/cache/cache_mock.go b/mdata/cache/cache_mock.go index 2b4222cf19..8ffd171674 100644 --- a/mdata/cache/cache_mock.go +++ b/mdata/cache/cache_mock.go @@ -46,11 +46,11 @@ func (mc *MockCache) Stop() { mc.StopCount++ } -func (mc *MockCache) Search(ctx context.Context, metric schema.AMKey, from uint32, until uint32) *CCSearchResult { +func (mc *MockCache) Search(ctx context.Context, metric schema.AMKey, from uint32, until uint32) (*CCSearchResult, error) { mc.Lock() defer mc.Unlock() mc.SearchCount++ - return nil + return nil, nil } func (mc *MockCache) DelMetric(rawMetric schema.MKey) (int, int) { diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index 157001c1ef..0a0cbd7656 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "flag" "runtime" "sync" @@ -17,8 +18,9 @@ import ( ) var ( - maxSize uint64 - searchFwdBug = stats.NewCounter32("recovered_errors.cache.metric.searchForwardBug") + maxSize uint64 + searchFwdBug = stats.NewCounter32("recovered_errors.cache.metric.searchForwardBug") + ErrInvalidRange = errors.New("CCache: invalid range. from must < to") ) func init() { @@ -198,7 +200,8 @@ func (c *CCache) evict(target *accnt.EvictTarget) { } // Search looks for the requested metric and returns a complete-as-possible CCSearchResult -func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) *CCSearchResult { +// from is inclusive, until is exclusive +func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) (*CCSearchResult, error) { ctx, span := tracing.NewSpan(ctx, c.tracer, "CCache.Search") defer span.Finish() var hit chunk.IterGen @@ -209,8 +212,8 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui Until: until, } - if from == until { - return res + if from >= until { + return nil, ErrInvalidRange } c.RLock() @@ -219,7 +222,7 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui if cm, ok = c.metricCache[metric]; !ok { span.SetTag("cache", "miss") accnt.CacheMetricMiss.Inc() - return res + return res, nil } cm.Search(ctx, metric, res, from, until) @@ -247,5 +250,5 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui } } - return res + return res, nil } diff --git a/mdata/cache/ccache_test.go b/mdata/cache/ccache_test.go index 079f024906..68c3ed748f 100644 --- a/mdata/cache/ccache_test.go +++ b/mdata/cache/ccache_test.go @@ -222,7 +222,11 @@ func TestDisconnectedAdding(t *testing.T) { cc.Add(metric, 0, itgen2) cc.Add(metric, 0, itgen3) - res := cc.Search(test.NewContext(), metric, 900, 1015) + res, err := cc.Search(test.NewContext(), metric, 900, 1015) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") @@ -256,7 +260,11 @@ func TestDisconnectedAddingByGuessing(t *testing.T) { cc.Add(metric, 1000, itgen2) cc.Add(metric, 0, itgen3) - res := cc.Search(test.NewContext(), metric, 900, 1015) + res, err := cc.Search(test.NewContext(), metric, 900, 1015) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") @@ -292,7 +300,11 @@ func TestDisconnectedAddingByGuessing(t *testing.T) { func TestSearchFromBeginningComplete(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 1006, 1025) + res, err := cc.Search(test.NewContext(), metric, 1006, 1025) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if !res.Complete { t.Fatalf("complete is expected to be true") @@ -310,7 +322,11 @@ func TestSearchFromBeginningComplete(t *testing.T) { func TestSearchFromBeginningIncompleteEnd(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 1006, 1030) + res, err := cc.Search(test.NewContext(), metric, 1006, 1030) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to be false") } @@ -327,7 +343,11 @@ func TestSearchFromBeginningIncompleteEnd(t *testing.T) { func TestSearchFromEnd(t *testing.T) { metric := test.GetAMKey(1) cc := getConnectedChunks(t, metric) - res := cc.Search(test.NewContext(), metric, 500, 1025) + res, err := cc.Search(test.NewContext(), metric, 500, 1025) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("complete is expected to not be true") @@ -363,8 +383,6 @@ func TestSearchDisconnectedStartEndNonSpanaware(t *testing.T) { } func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { - var cc *CCache - var res *CCSearchResult metric := test.GetAMKey(1) values := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} @@ -374,7 +392,7 @@ func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { itgen4 := getItgen(t, values, 1030, spanaware) itgen5 := getItgen(t, values, 1040, spanaware) itgen6 := getItgen(t, values, 1050, spanaware) - cc = NewCCache() + cc := NewCCache() for from := uint32(1000); from < 1010; from++ { // the end of ranges is exclusive, so we go up to 1060 @@ -397,7 +415,11 @@ func testSearchDisconnectedStartEnd(t *testing.T, spanaware, ascending bool) { cc.Add(metric, 0, itgen1) } - res = cc.Search(test.NewContext(), metric, from, until) + res, err := cc.Search(test.NewContext(), metric, from, until) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if !res.Complete { t.Fatalf("from %d, until %d: complete is expected to be true", from, until) } @@ -439,8 +461,6 @@ func TestSearchDisconnectedWithGapStartEndNonSpanaware(t *testing.T) { func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bool) { metric := test.GetAMKey(1) - var cc *CCache - var res *CCSearchResult values := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} itgen1 := getItgen(t, values, 1000, spanaware) @@ -450,7 +470,7 @@ func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bo itgen4 := getItgen(t, values, 1040, spanaware) itgen5 := getItgen(t, values, 1050, spanaware) itgen6 := getItgen(t, values, 1060, spanaware) - cc = NewCCache() + cc := NewCCache() for from := uint32(1000); from < 1010; from++ { // the end of ranges is exclusive, so we go up to 1060 @@ -473,7 +493,11 @@ func testSearchDisconnectedWithGapStartEnd(t *testing.T, spanaware, ascending bo cc.Add(metric, 0, itgen1) } - res = cc.Search(test.NewContext(), metric, from, until) + res, err := cc.Search(test.NewContext(), metric, from, until) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if res.Complete { t.Fatalf("from %d, until %d: complete is expected to be false", from, until) } @@ -520,8 +544,6 @@ func TestMetricDelete(t *testing.T) { } func testMetricDelete(t *testing.T, cc *CCache) { - var res *CCSearchResult - rawMetric1 := test.GetAMKey(1) metric1_1 := schema.GetAMKey(rawMetric1.MKey, schema.Cnt, 600) metric1_2 := schema.GetAMKey(rawMetric1.MKey, schema.Sum, 600) @@ -542,19 +564,33 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if Search returns them all for metric1_1 - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err := cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } // check if Search returns them all for metric1_2 - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } // check if Search returns them all for metric2_1 - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -572,19 +608,34 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric1_2 returns no results anymore - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // but metric2_1 should still be there - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -596,7 +647,12 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // and check if it gets returned by Search again - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != itgenCount { t.Fatalf("Expected to have %d values, got %d", itgenCount, len(res.Start)) } @@ -614,19 +670,34 @@ func testMetricDelete(t *testing.T, cc *CCache) { } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric1_1 returns no results anymore - res = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric1_2, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } // check if metric2_1 returns no results anymore - res = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + res, err = cc.Search(test.NewContext(), metric2_1, 1000, uint32(1000+itgenCount*len(values))) + + if err != nil { + t.Fatalf("expected err nil, got %v", err) + } + if len(res.Start) != 0 { t.Fatalf("Expected to have %d values, got %d", 0, len(res.Start)) } diff --git a/mdata/cache/if.go b/mdata/cache/if.go index d47168d4ac..a01db55261 100644 --- a/mdata/cache/if.go +++ b/mdata/cache/if.go @@ -11,7 +11,7 @@ type Cache interface { Add(metric schema.AMKey, prev uint32, itergen chunk.IterGen) CacheIfHot(metric schema.AMKey, prev uint32, itergen chunk.IterGen) Stop() - Search(ctx context.Context, metric schema.AMKey, from, until uint32) *CCSearchResult + Search(ctx context.Context, metric schema.AMKey, from, until uint32) (*CCSearchResult, error) DelMetric(rawMetric schema.MKey) (int, int) Reset() (int, int) } diff --git a/mdata/ifaces.go b/mdata/ifaces.go index c8d012b9ea..0a33378e2e 100644 --- a/mdata/ifaces.go +++ b/mdata/ifaces.go @@ -17,13 +17,13 @@ type Metrics interface { type Metric interface { Add(ts uint32, val float64) - Get(from, to uint32) Result + Get(from, to uint32) (Result, error) GetAggregated(consolidator consolidation.Consolidator, aggSpan, from, to uint32) (Result, error) } type Store interface { Add(cwr *ChunkWriteRequest) - Search(ctx context.Context, key schema.AMKey, ttl, start, end uint32) ([]chunk.IterGen, error) + Search(ctx context.Context, key schema.AMKey, ttl, from, to uint32) ([]chunk.IterGen, error) Stop() SetTracer(t opentracing.Tracer) } diff --git a/store/cassandra/store_cassandra.go b/store/cassandra/store_cassandra.go index 91712d3300..fd9fa36af8 100644 --- a/store/cassandra/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -33,12 +33,12 @@ const Month_sec = 60 * 60 * 24 * 28 const Table_name_format = `metric_%d` var ( - errChunkTooSmall = errors.New("unpossibly small chunk in cassandra") - errStartBeforeEnd = errors.New("start must be before end.") - errReadQueueFull = errors.New("the read queue is full") - errReadTooOld = errors.New("the read is too old") - errTableNotFound = errors.New("table for given TTL not found") - errCtxCanceled = errors.New("context canceled") + errChunkTooSmall = errors.New("unpossibly small chunk in cassandra") + errInvalidRange = errors.New("CassandraStore: invalid range. from must < to") + errReadQueueFull = errors.New("the read queue is full") + errReadTooOld = errors.New("the read is too old") + errTableNotFound = errors.New("table for given TTL not found") + errCtxCanceled = errors.New("context canceled") // metric store.cassandra.get.exec is the duration of getting from cassandra store cassGetExecDuration = stats.NewLatencyHistogram15s32("store.cassandra.get.exec") @@ -469,10 +469,10 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl tags.PeerService.Set(span, "cassandra") itgens := make([]chunk.IterGen, 0) - if start > end { + if start >= end { tracing.Failure(span) - tracing.Error(span, errStartBeforeEnd) - return itgens, errStartBeforeEnd + tracing.Error(span, errInvalidRange) + return itgens, errInvalidRange } pre := time.Now() From d85527c74c09ce92731518a851b85b7af22937db Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 20:27:20 +0200 Subject: [PATCH 06/10] bugfix: don't break intra-adjacent-timestamp range requests. if interval is, say 120s then you can expect points with these timestamps: 120 240 360 ... previously, if you issued a render request such as to=300,until=330 on a raw archive (which still needs quantizing so we do some tricks) those tricks would result in bad request errors. Now we handle them as they should: return []. fix #912 --- api/dataprocessor.go | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index 41af4c6c1e..e25a643417 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -395,6 +395,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato default: } rctx := newRequestContext(ctx, &req, consolidator) + // see newRequestContext for a detailed explanation of this. + if rctx.From == rctx.To { + return nil, nil + } res, err := s.getSeries(rctx) if err != nil { return nil, err @@ -660,8 +664,32 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing) // to retrieve the stored data, we also use from inclusive and to exclusive, // so to make sure that the data after quantization (Fix()) is correct, we have to make the following adjustment: - // `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1) - // `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181) + // `from` 1..60 needs data 1..60 -> to assure we can read that data we adjust `from` to previous boundary+1 (here 1). (will be quantized to next boundary in this case 60) + // `to` 181..240 needs data 121..180 -> to avoid reading needless data we adjust `to` to previous boundary+1 (here 181), last ts returned must be 180 + + // except... there's a special case. let's say archinterval=60, and user requests: + // to=25, until=36 + // we know that in the graphite model there will be no data in that timeframe: + // maybe the user submitted a point with ts=30 but it will be quantized to ts=60 so it is out of range. + // but wouldn't it be nice to include it anyway? + // we can argue both ways, but the other thing is that if we apply the logic above, we end up with: + // from := 1 + // to := 1 + // which is a query not accepted by AggMetric, Ccache or store. (from must < to) + // the only way we can get acceptable queries is for from to be 1 and to to remain 36 (or become 60 or 61) + // such a fetch request would include the requested point + // but we know Fix() will later create the output according to these rules: + // * first point should have the first timestamp >= from that divides by interval (that would be 60 in this case) + // * last point should have the last timestamp < to that divides by interval (because to is always exclusive) (that would be 0 in this case) + // which wouldn't make sense of course. one could argue it should output one point with ts=60, + // but to do that, we have to "broaden" the `to` requested by the user, covering a larger time frame they didn't actually request. + // and we deviate from the quantizing model. + // I think we should just stick to the quantizing model + + // we can do the logic above backwards: if from and to are adjusted to the same value, such as 181, it means `from` was 181..240 and `to` was 181..240 + // which is either a nonsensical request (from > to, from == to) or from < to but such that the requested timeframe falls in between two adjacent quantized + // timestamps and could not include either of them. + // so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client. if consolidator == consolidation.None { rc.From = prevBoundary(req.From, req.ArchInterval) + 1 From ef7bd6678514d9064b6eb9fd916e305fa2b98095 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 20:45:39 +0200 Subject: [PATCH 07/10] misc doc updates --- api/dataprocessor.go | 21 ++++++++++++++------- api/graphite.go | 1 + 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index e25a643417..e9bc2f8039 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -198,6 +198,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se return out, nil } +// getTargetsRemote issues the requests on other nodes +// it's nothing more than a thin network wrapper around getTargetsLocal of a peer. func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) { responses := make(chan getTargetsResp, len(remoteReqs)) rCtx, cancel := context.WithCancel(ctx) @@ -387,6 +389,9 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) { } } +// getSeriesFixed gets the series and makes sure the output is quantized +// (needed because the raw chunks don't contain quantized data) +// TODO: we can probably forego Fix if archive > 0 func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) { select { case <-ctx.Done(): @@ -413,6 +418,9 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato return Fix(res.Points, req.From, req.To, req.ArchInterval), nil } +// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive) +// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval +// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series. func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { res, err := s.getSeriesAggMetrics(ctx) if err != nil { @@ -436,6 +444,7 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { // if oldest < to -> search until oldest, we already have the rest from mem // if to < oldest -> no need to search until oldest, only search until to + // adds iters from both the cache and the store (if applicable) until := util.Min(res.Oldest, ctx.To) fromCache, err := s.getSeriesCachedStore(ctx, until) if err != nil { @@ -445,10 +454,8 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) { return res, nil } -// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive) -// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval -// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series. -// all data will also be quantized. +// itersToPoints converts the iters to points if they are within the from/to range +// TODO: just work on the result directly func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema.Point { pre := time.Now() @@ -539,7 +546,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun default: } - // the request cannot completely be served from cache, it will require cassandra involvement + // the request cannot completely be served from cache, it will require store involvement if !cacheRes.Complete { if cacheRes.From != cacheRes.Until { storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until) @@ -559,11 +566,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun if err != nil { // TODO(replay) figure out what to do if one piece is corrupt tracing.Failure(span) - tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err) + tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err) return iters, err } // it's important that the itgens get added in chronological order, - // currently we rely on cassandra returning results in order + // currently we rely on store returning results in order s.Cache.Add(ctx.AMKey, prevts, itgen) prevts = itgen.Ts iters = append(iters, *it) diff --git a/api/graphite.go b/api/graphite.go index db690537e9..463dd0e7ec 100644 --- a/api/graphite.go +++ b/api/graphite.go @@ -149,6 +149,7 @@ func (s *Server) findSeriesLocal(ctx context.Context, orgId uint32, patterns []s return result, nil } +// findSeriesRemote calls findSeriesLocal on a peer via http rpc func (s *Server) findSeriesRemote(ctx context.Context, orgId uint32, patterns []string, seenAfter int64, peer cluster.Node) ([]Series, error) { log.Debug("HTTP Render querying %s/index/find for %d:%q", peer.GetName(), orgId, patterns) data := models.IndexFind{ From 28cfeeb6b95343a1395518de63b89d4950103a6e Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Thu, 17 May 2018 23:10:34 +0200 Subject: [PATCH 08/10] more userfriendly error --- api/dataprocessor.go | 2 +- mdata/aggmetric.go | 2 +- mdata/cache/ccache.go | 2 +- store/cassandra/store_cassandra.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api/dataprocessor.go b/api/dataprocessor.go index e9bc2f8039..7b5a5a3a3e 100644 --- a/api/dataprocessor.go +++ b/api/dataprocessor.go @@ -682,7 +682,7 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol // we can argue both ways, but the other thing is that if we apply the logic above, we end up with: // from := 1 // to := 1 - // which is a query not accepted by AggMetric, Ccache or store. (from must < to) + // which is a query not accepted by AggMetric, Ccache or store. (from must be less than to) // the only way we can get acceptable queries is for from to be 1 and to to remain 36 (or become 60 or 61) // such a fetch request would include the requested point // but we know Fix() will later create the output according to these rules: diff --git a/mdata/aggmetric.go b/mdata/aggmetric.go index a7ccbbb2f6..26fc72394d 100644 --- a/mdata/aggmetric.go +++ b/mdata/aggmetric.go @@ -16,7 +16,7 @@ import ( "gopkg.in/raintank/schema.v1" ) -var ErrInvalidRange = errors.New("AggMetric: invalid range. from must < to") +var ErrInvalidRange = errors.New("AggMetric: invalid range: from must be less than to") var ErrNilChunk = errors.New("AggMetric: unexpected nil chunk") // AggMetric takes in new values, updates the in-memory data and streams the points to aggregators diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index 0a0cbd7656..9279892fa6 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -20,7 +20,7 @@ import ( var ( maxSize uint64 searchFwdBug = stats.NewCounter32("recovered_errors.cache.metric.searchForwardBug") - ErrInvalidRange = errors.New("CCache: invalid range. from must < to") + ErrInvalidRange = errors.New("CCache: invalid range: from must be less than to") ) func init() { diff --git a/store/cassandra/store_cassandra.go b/store/cassandra/store_cassandra.go index fd9fa36af8..dfb42bf24a 100644 --- a/store/cassandra/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -34,7 +34,7 @@ const Table_name_format = `metric_%d` var ( errChunkTooSmall = errors.New("unpossibly small chunk in cassandra") - errInvalidRange = errors.New("CassandraStore: invalid range. from must < to") + errInvalidRange = errors.New("CassandraStore: invalid range: from must be less than to") errReadQueueFull = errors.New("the read queue is full") errReadTooOld = errors.New("the read is too old") errTableNotFound = errors.New("table for given TTL not found") From e3c1aeda19917129f1e347e2d794ddc3c43f73e9 Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Fri, 18 May 2018 13:51:03 +0200 Subject: [PATCH 09/10] kick circle --- mdata/cwr.go | 2 ++ stacktest/fakemetrics/out/out.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/mdata/cwr.go b/mdata/cwr.go index 90454f232f..0f7bbffe23 100644 --- a/mdata/cwr.go +++ b/mdata/cwr.go @@ -7,6 +7,7 @@ import ( "gopkg.in/raintank/schema.v1" ) +// ChunkWriteRequest is a request to write a chunk into a store type ChunkWriteRequest struct { Metric *AggMetric Key schema.AMKey @@ -16,6 +17,7 @@ type ChunkWriteRequest struct { Span uint32 } +// NewChunkWriteRequest creates a new ChunkWriteRequest func NewChunkWriteRequest(metric *AggMetric, key schema.AMKey, chunk *chunk.Chunk, ttl, span uint32, ts time.Time) ChunkWriteRequest { return ChunkWriteRequest{metric, key, chunk, ttl, ts, span} } diff --git a/stacktest/fakemetrics/out/out.go b/stacktest/fakemetrics/out/out.go index 0c8d97ce60..fcdd267c9f 100644 --- a/stacktest/fakemetrics/out/out.go +++ b/stacktest/fakemetrics/out/out.go @@ -7,11 +7,13 @@ import ( "gopkg.in/raintank/schema.v1" ) +// Out submits metricdata to a destination type Out interface { Close() error Flush(metrics []*schema.MetricData) error } +// OutStats tracks metrics related to an Output type OutStats struct { FlushDuration met.Timer // duration of Flush() PublishQueued met.Gauge // not every output uses this @@ -23,6 +25,7 @@ type OutStats struct { MessageMetrics met.Meter // number of metrics per message } +// NewStats creates a new OutStats func NewStats(stats met.Backend, output string) OutStats { return OutStats{ FlushDuration: stats.NewTimer(fmt.Sprintf("metricpublisher.out.%s.flush_duration", output), 0), From 3823c2b986be58274a8d518a6504f662fb84871c Mon Sep 17 00:00:00 2001 From: Dieter Plaetinck Date: Sat, 19 May 2018 12:07:13 +0200 Subject: [PATCH 10/10] cleanup --- api/dataprocessor_test.go | 11 ++++------- mdata/cache/ccache.go | 19 +++++++++---------- store/cassandra/store_cassandra.go | 2 +- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/api/dataprocessor_test.go b/api/dataprocessor_test.go index cd0a40c2f0..48c4bb1893 100644 --- a/api/dataprocessor_test.go +++ b/api/dataprocessor_test.go @@ -499,9 +499,6 @@ func TestGetSeriesCachedStore(t *testing.T) { metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0) srv.BindMemoryStore(metrics) metric := test.GetAMKey(1) - var c *cache.CCache - var itgen *chunk.IterGen - var prevts uint32 type testcase struct { // the pattern of chunks @@ -544,14 +541,14 @@ func TestGetSeriesCachedStore(t *testing.T) { for from := start; from <= lastTs; from += step { for to := from; to <= lastTs; to += step { // use fresh store and cache - c = cache.NewCCache() + c := cache.NewCCache() srv.BindCache(c) store.Reset() // populate cache and store according to pattern definition - prevts = 0 + var prevts uint32 for i := 0; i < len(tc.Pattern); i++ { - itgen = chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span) + itgen := chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span) if pattern[i] == 'c' || pattern[i] == 'b' { c.Add(metric, prevts, *itgen) } @@ -589,7 +586,7 @@ func TestGetSeriesCachedStore(t *testing.T) { // we use the tsTracker to increase together with the iterators and compare at each step tsTracker := expectResFrom - tsSlice := make([]uint32, 0) + var tsSlice []uint32 for i, it := range iters { for it.Next() { ts, _ := it.Values() diff --git a/mdata/cache/ccache.go b/mdata/cache/ccache.go index 9279892fa6..e37cf13576 100644 --- a/mdata/cache/ccache.go +++ b/mdata/cache/ccache.go @@ -204,22 +204,21 @@ func (c *CCache) evict(target *accnt.EvictTarget) { func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until uint32) (*CCSearchResult, error) { ctx, span := tracing.NewSpan(ctx, c.tracer, "CCache.Search") defer span.Finish() - var hit chunk.IterGen - var cm *CCacheMetric - var ok bool - res := &CCSearchResult{ - From: from, - Until: until, - } if from >= until { return nil, ErrInvalidRange } + res := &CCSearchResult{ + From: from, + Until: until, + } + c.RLock() defer c.RUnlock() - if cm, ok = c.metricCache[metric]; !ok { + cm, ok := c.metricCache[metric] + if !ok { span.SetTag("cache", "miss") accnt.CacheMetricMiss.Inc() return res, nil @@ -233,10 +232,10 @@ func (c *CCache) Search(ctx context.Context, metric schema.AMKey, from, until ui accnt.CacheChunkHit.Add(len(res.Start) + len(res.End)) go func() { - for _, hit = range res.Start { + for _, hit := range res.Start { c.accnt.HitChunk(metric, hit.Ts) } - for _, hit = range res.End { + for _, hit := range res.End { c.accnt.HitChunk(metric, hit.Ts) } }() diff --git a/store/cassandra/store_cassandra.go b/store/cassandra/store_cassandra.go index dfb42bf24a..bb23122267 100644 --- a/store/cassandra/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -33,7 +33,7 @@ const Month_sec = 60 * 60 * 24 * 28 const Table_name_format = `metric_%d` var ( - errChunkTooSmall = errors.New("unpossibly small chunk in cassandra") + errChunkTooSmall = errors.New("impossibly small chunk in cassandra") errInvalidRange = errors.New("CassandraStore: invalid range: from must be less than to") errReadQueueFull = errors.New("the read queue is full") errReadTooOld = errors.New("the read is too old")