Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 5b78a26

Browse files
authored
Merge pull request #1926 from grafana/maxSeries-non-tagged
apply max-series-per-req to non-tagged queries
2 parents 8f037dd + 596df84 commit 5b78a26

17 files changed

+250
-115
lines changed

api/ccache.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete)
4949
var toClear []idx.Node
5050
if len(req.Patterns) > 0 {
5151
for _, pattern := range req.Patterns {
52-
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, 0)
52+
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, 0, 0)
5353
if err != nil {
5454
res.AddError(err)
5555
code = http.StatusInternalServerError

api/cluster.go

+38-18
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (s *Server) indexFind(ctx *middleware.Context, req models.IndexFind) {
128128
}
129129

130130
for _, pattern := range req.Patterns {
131-
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, req.From)
131+
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, req.From, req.Limit)
132132
if err != nil {
133133
response.Write(ctx, response.WrapError(err))
134134
return
@@ -471,20 +471,18 @@ func (s *Server) queryAllPeers(ctx context.Context, data cluster.Traceable, name
471471
return result, errors
472472
}
473473

474-
// queryAllShards takes a request and the path to request it on, then fans it out
474+
// queryAllShards takes a function and calls it for one peer in each shard
475475
// across the cluster. If any peer fails, we try another replica. If enough
476476
// peers have been heard from (based on speculation-threshold configuration), and we
477477
// are missing the others, try to speculatively query other members of the shard group.
478+
// all responses are collected and returned at once.
478479
// ctx: request context
479-
// data: request to be submitted
480480
// name: name to be used in logging & tracing
481-
// path: path to request on
482-
func (s *Server) queryAllShards(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
481+
// fetchFunc: function to call to fetch the data from a peer
482+
func (s *Server) queryAllShards(ctx context.Context, name string, fetchFn fetchFunc) (map[string]PeerResponse, error) {
483483
result := make(map[string]PeerResponse)
484484

485-
responseChan, errorChan := s.queryAllShardsGeneric(ctx, name, func(reqCtx context.Context, peer cluster.Node) (interface{}, error) {
486-
return peer.Post(reqCtx, name, path, data)
487-
})
485+
responseChan, errorChan := s.queryAllShardsGeneric(ctx, name, fetchFn)
488486

489487
for resp := range responseChan {
490488
result[resp.peer.GetName()] = PeerResponse{
@@ -501,9 +499,11 @@ func (s *Server) queryAllShards(ctx context.Context, data cluster.Traceable, nam
501499
// across the cluster. If any peer fails, we try another replica. If enough
502500
// peers have been heard from (based on speculation-threshold configuration), and we
503501
// are missing the others, try to speculatively query other members of the shard group.
502+
// all responses and errors are streamed through the returned channels
504503
// ctx: request context
504+
// name: name to be used in logging & tracing
505505
// fetchFunc: function to call to fetch the data from a peer
506-
func (s *Server) queryAllShardsGeneric(ctx context.Context, name string, fetchFunc func(context.Context, cluster.Node) (interface{}, error)) (<-chan GenericPeerResponse, <-chan error) {
506+
func (s *Server) queryAllShardsGeneric(ctx context.Context, name string, fetchFn fetchFunc) (<-chan GenericPeerResponse, <-chan error) {
507507
peerGroups, err := cluster.MembersForSpeculativeQuery()
508508
if err != nil {
509509
log.Errorf("HTTP peerQuery unable to get peers, %s", err.Error())
@@ -513,10 +513,22 @@ func (s *Server) queryAllShardsGeneric(ctx context.Context, name string, fetchFu
513513
return resultChan, errorChan
514514
}
515515

516-
return queryPeers(ctx, peerGroups, name, fetchFunc)
516+
return queryPeers(ctx, peerGroups, name, fetchFn)
517517
}
518518

519-
type fetchFunc func(context.Context, cluster.Node) (interface{}, error)
519+
// fetchFunc is a function to query the given cluster.Node
520+
// the list of all nodes in the cluster is passed as well for additional context
521+
// Example: fetchFunc can use this to determine the ratio of how much data the target peer owns
522+
// compared to the cluster as a whole. Caveat: this is based on live cluster state. If shardgroups
523+
// go completely down it'll look like the target peer owns more of the cluster than it actually does.
524+
// if query limits are set based on this, the limits would loosen up as shards leave the cluster.
525+
type fetchFunc func(context.Context, cluster.Node, map[int32][]cluster.Node) (interface{}, error)
526+
527+
func fetchFuncPost(data cluster.Traceable, name, path string) fetchFunc {
528+
return func(reqCtx context.Context, peer cluster.Node, peerGroups map[int32][]cluster.Node) (interface{}, error) {
529+
return peer.Post(reqCtx, name, path, data)
530+
}
531+
}
520532

521533
type shardResponse struct {
522534
shardGroup int32
@@ -532,20 +544,20 @@ type shardState struct {
532544
}
533545

534546
// AskPeer issues the query on the next peer, if available, and returns it
535-
func (state *shardState) AskPeer(ctx context.Context, fn fetchFunc, responses chan shardResponse) (cluster.Node, bool) {
547+
func (state *shardState) AskPeer(ctx context.Context, peerGroups map[int32][]cluster.Node, fn fetchFunc, responses chan shardResponse) (cluster.Node, bool) {
536548
if len(state.remainingPeers) == 0 {
537549
return nil, false
538550
}
539551
peer := state.remainingPeers[0]
540552
state.remainingPeers = state.remainingPeers[1:]
541553
state.inflight++
542-
go state.askPeer(ctx, peer, fn, responses)
554+
go state.askPeer(ctx, peerGroups, peer, fn, responses)
543555
return peer, true
544556
}
545557

546-
func (state *shardState) askPeer(ctx context.Context, peer cluster.Node, fetchFn fetchFunc, responses chan shardResponse) {
558+
func (state *shardState) askPeer(ctx context.Context, peerGroups map[int32][]cluster.Node, peer cluster.Node, fetchFn fetchFunc, responses chan shardResponse) {
547559
//log.Debugf("HTTP Render querying %s%s", peer.GetName(), path)
548-
resp, err := fetchFn(ctx, peer)
560+
resp, err := fetchFn(ctx, peer, peerGroups)
549561
select {
550562
case <-ctx.Done():
551563
return
@@ -593,7 +605,7 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
593605
shard: shard,
594606
remainingPeers: peers,
595607
}
596-
peer, _ := state.AskPeer(reqCtx, fetchFn, responses) // thanks to the above check we always know there was a peer available
608+
peer, _ := state.AskPeer(reqCtx, peerGroups, fetchFn, responses) // thanks to the above check we always know there was a peer available
597609
originalPeers[peer.GetName()] = struct{}{}
598610
states[shard] = state
599611
}
@@ -621,8 +633,16 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
621633
}
622634

623635
if resp.err != nil {
636+
if resp.err.Error() == "400 Bad Request" {
637+
// if we got bad request, then retrying it on a different replica will result in the same
638+
// Cancel the reqCtx, which will cancel all in-flight requests.
639+
cancel()
640+
errorChan <- resp.err
641+
return
642+
}
643+
624644
// if we can try another peer for this shardGroup, do it
625-
_, ok := states[resp.shardGroup].AskPeer(reqCtx, fetchFn, responses)
645+
_, ok := states[resp.shardGroup].AskPeer(reqCtx, peerGroups, fetchFn, responses)
626646
if ok {
627647
speculativeRequests.Inc()
628648
continue
@@ -656,7 +676,7 @@ func queryPeers(ctx context.Context, peerGroups map[int32][]cluster.Node, name s
656676
continue
657677
}
658678

659-
if _, ok := states[shardGroup].AskPeer(specCtx, fetchFn, responses); ok {
679+
if _, ok := states[shardGroup].AskPeer(specCtx, peerGroups, fetchFn, responses); ok {
660680
speculativeRequests.Inc()
661681
}
662682
}

api/dataprocessor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats,
239239
rCtx, cancel := context.WithCancel(ctx)
240240
defer cancel()
241241

242-
resultChan, errorChan := queryPeers(rCtx, requiredPeers, "getTargetsRemote", func(ctx context.Context, node cluster.Node) (interface{}, error) {
242+
resultChan, errorChan := queryPeers(rCtx, requiredPeers, "getTargetsRemote", func(ctx context.Context, node cluster.Node, peerGroups map[int32][]cluster.Node) (interface{}, error) {
243243
var resp models.GetDataRespV1
244244
reqs, ok := shardReqs[node.GetPartitions()[0]]
245245
if !ok {

api/graphite.go

+45-16
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,29 @@ type Series struct {
100100
Node cluster.Node
101101
}
102102

103-
func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string, seenAfter int64) ([]Series, error) {
104-
data := models.IndexFind{
105-
Patterns: patterns,
106-
OrgId: orgId,
107-
From: seenAfter,
103+
func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string, seenAfter int64, maxSeries int) ([]Series, error) {
104+
105+
fetchFn := func(reqCtx context.Context, peer cluster.Node, peerGroups map[int32][]cluster.Node) (interface{}, error) {
106+
ourParts := len(peer.GetPartitions())
107+
108+
// assign a fractional maxSeries limit (not global, but relative to how much data the peer has)
109+
// look at each shardgroup and check how many partitions it has
110+
// (we assume each shardgroup is consistent across different peers for that shardgroup)
111+
var totalParts int
112+
for _, otherPeers := range peerGroups {
113+
if len(otherPeers) > 0 {
114+
totalParts += len(otherPeers[0].GetPartitions())
115+
}
116+
}
117+
data := models.IndexFind{
118+
Patterns: patterns,
119+
OrgId: orgId,
120+
From: seenAfter,
121+
Limit: int64(maxSeries * ourParts / totalParts),
122+
}
123+
return peer.Post(reqCtx, "findSeriesRemote", "/index/find", data)
108124
}
109-
110-
resps, err := s.queryAllShards(ctx, data, "findSeriesRemote", "/index/find")
125+
resps, err := s.queryAllShards(ctx, "findSeriesRemote", fetchFn)
111126
if err != nil {
112127
return nil, err
113128
}
@@ -121,12 +136,23 @@ func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string
121136
series := make([]Series, 0)
122137
resp := models.IndexFindResp{}
123138
for _, r := range resps {
139+
if len(series) == maxSeries {
140+
return nil, response.NewError(
141+
http.StatusRequestEntityTooLarge,
142+
fmt.Sprintf("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit.", maxSeriesPerReq))
143+
}
124144
_, err = resp.UnmarshalMsg(r.buf)
125145
if err != nil {
126146
return nil, err
127147
}
128148

129149
for pattern, nodes := range resp.Nodes {
150+
if len(series) == maxSeries {
151+
return nil, response.NewError(
152+
http.StatusRequestEntityTooLarge,
153+
fmt.Sprintf("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit.", maxSeriesPerReq))
154+
}
155+
130156
series = append(series, Series{
131157
Pattern: pattern,
132158
Node: r.peer,
@@ -330,7 +356,7 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
330356
}
331357
nodes := make([]idx.Node, 0)
332358
reqCtx := ctx.Req.Context()
333-
series, err := s.findSeries(reqCtx, ctx.OrgId, []string{request.Query}, int64(fromUnix))
359+
series, err := s.findSeries(reqCtx, ctx.OrgId, []string{request.Query}, int64(fromUnix), maxSeriesPerReq)
334360
if err != nil {
335361
response.Write(ctx, response.WrapError(err))
336362
return
@@ -378,7 +404,7 @@ func (s *Server) metricsExpand(ctx *middleware.Context, request models.GraphiteE
378404
for i, query := range request.Query {
379405
i, query := i, query
380406
g.Go(func() error {
381-
series, err := s.findSeries(errGroupCtx, ctx.OrgId, []string{query}, 0)
407+
series, err := s.findSeries(errGroupCtx, ctx.OrgId, []string{query}, 0, maxSeriesPerReq)
382408
if err != nil {
383409
return err
384410
}
@@ -795,7 +821,10 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan *expr.Plan)
795821
}
796822
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-int(reqs.cnt), false)
797823
} else {
798-
series, err = s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From))
824+
// find limit is the limit minus what we already consumed for other targets, adjusted for how many rawReqs we folded into this resolveSeriesRequest
825+
// note that this doesn't account for duplicate requests like target=foo&target=foo because those are both represented by the same rawReq (see NewPlan)
826+
findLimit := (maxSeriesPerReq - int(reqs.cnt)) / len(rawReqs)
827+
series, err = s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From), findLimit)
799828
}
800829
if err != nil {
801830
return nil, meta, err
@@ -1046,7 +1075,7 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId uint32, tag, filte
10461075
result := make(map[string]uint64)
10471076

10481077
data := models.IndexTagDetails{OrgId: orgId, Tag: tag, Filter: filter}
1049-
resps, err := s.queryAllShards(ctx, data, "clusterTagDetails", "/index/tag_details")
1078+
resps, err := s.queryAllShards(ctx, "clusterTagDetails", fetchFuncPost(data, "clusterTagDetails", "/index/tag_details"))
10501079
if err != nil {
10511080
return nil, err
10521081
}
@@ -1147,7 +1176,7 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
11471176
newCtx, cancel := context.WithCancel(ctx)
11481177
defer cancel()
11491178
responseChan, errorChan := s.queryAllShardsGeneric(newCtx, "clusterFindByTag",
1150-
func(reqCtx context.Context, peer cluster.Node) (interface{}, error) {
1179+
func(reqCtx context.Context, peer cluster.Node, peerGroups map[int32][]cluster.Node) (interface{}, error) {
11511180
resp := models.IndexFindByTagResp{}
11521181
body, err := peer.PostRaw(reqCtx, "clusterFindByTag", "/index/find_by_tag", data)
11531182
if body == nil || err != nil {
@@ -1222,7 +1251,7 @@ func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTa
12221251

12231252
func (s *Server) clusterTags(ctx context.Context, orgId uint32, filter string) ([]string, error) {
12241253
data := models.IndexTags{OrgId: orgId, Filter: filter}
1225-
resps, err := s.queryAllShards(ctx, data, "clusterTags", "/index/tags")
1254+
resps, err := s.queryAllShards(ctx, "clusterTags", fetchFuncPost(data, "clusterTags", "/index/tags"))
12261255
if err != nil {
12271256
return nil, err
12281257
}
@@ -1274,7 +1303,7 @@ func (s *Server) clusterAutoCompleteTags(ctx context.Context, orgId uint32, pref
12741303
tagSet := make(map[string]struct{})
12751304

12761305
data := models.IndexAutoCompleteTags{OrgId: orgId, Prefix: prefix, Expr: expressions, Limit: limit}
1277-
responses, err := s.queryAllShards(ctx, data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags")
1306+
responses, err := s.queryAllShards(ctx, "clusterAutoCompleteTags", fetchFuncPost(data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags"))
12781307
if err != nil {
12791308
return nil, err
12801309
}
@@ -1321,7 +1350,7 @@ func (s *Server) clusterAutoCompleteTagValues(ctx context.Context, orgId uint32,
13211350
valSet := make(map[string]struct{})
13221351

13231352
data := models.IndexAutoCompleteTagValues{OrgId: orgId, Tag: tag, Prefix: prefix, Expr: expressions, Limit: limit}
1324-
responses, err := s.queryAllShards(ctx, data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values")
1353+
responses, err := s.queryAllShards(ctx, "clusterAutoCompleteValues", fetchFuncPost(data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values"))
13251354
if err != nil {
13261355
return nil, err
13271356
}
@@ -1352,7 +1381,7 @@ func (s *Server) clusterAutoCompleteTagValues(ctx context.Context, orgId uint32,
13521381

13531382
func (s *Server) graphiteTagTerms(ctx *middleware.Context, request models.GraphiteTagTerms) {
13541383
data := models.IndexTagTerms{OrgId: ctx.OrgId, Tags: request.Tags, Expr: request.Expr}
1355-
responses, err := s.queryAllShards(ctx.Req.Context(), data, "graphiteTagTerms", "/index/tags/terms")
1384+
responses, err := s.queryAllShards(ctx.Req.Context(), "graphiteTagTerms", fetchFuncPost(data, "graphiteTagTerms", "/index/tags/terms"))
13561385
if err != nil {
13571386
response.Write(ctx, response.WrapErrorForTagDB(err))
13581387
return

api/models/node.go

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ type IndexFind struct {
166166
Patterns []string `json:"patterns" form:"patterns" binding:"Required"`
167167
OrgId uint32 `json:"orgId" form:"orgId" binding:"Required"`
168168
From int64 `json:"from" form:"from"`
169+
Limit int64 `json:"limit"`
169170
}
170171

171172
func (i IndexFind) Trace(span opentracing.Span) {

cmd-dev/mt-simulate-memory-idx-lock-contention/runner/test_runner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (t *TestRun) runQuery(pattern string, wg *sync.WaitGroup, active chan struc
222222
}()
223223
pre := time.Now()
224224
active <- struct{}{}
225-
_, err := t.index.Find(orgID, pattern, 0)
225+
_, err := t.index.Find(orgID, pattern, 0, 0)
226226
if err != nil {
227227
log.Printf("Warning: Query failed with error: %s", err)
228228
}

idx/bigtable/bigtable.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,15 @@ func (b *BigtableIdx) updateBigtable(now uint32, inMemory bool, archive idx.Arch
260260
return archive
261261
}
262262

263-
func (b *BigtableIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node, error) {
263+
func (b *BigtableIdx) Find(orgId uint32, pattern string, from, limit int64) ([]idx.Node, error) {
264264
// The lastUpdate timestamp does not get updated in the bigtable index every time when
265265
// a data point is received, there can be a delay of up to b.cfg.updateInterval32. To
266266
// avoid falsely excluding a metric based on its lastUpdate timestamp we offset the
267267
// from time by updateInterval32, this way we err on the "too inclusive" side
268268
if from > int64(b.cfg.updateInterval32) {
269269
from -= int64(b.cfg.updateInterval32)
270270
}
271-
return b.MemoryIndex.Find(orgId, pattern, from)
271+
return b.MemoryIndex.Find(orgId, pattern, from, limit)
272272
}
273273

274274
func (b *BigtableIdx) rebuildIndex() {

idx/cassandra/cassandra.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -297,15 +297,15 @@ func (c *CasIdx) updateCassandra(now uint32, inMemory bool, archive idx.Archive,
297297
return archive
298298
}
299299

300-
func (c *CasIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node, error) {
300+
func (c *CasIdx) Find(orgId uint32, pattern string, from, limit int64) ([]idx.Node, error) {
301301
// The lastUpdate timestamp does not get updated in the cassandra index every time when
302302
// a data point is received, there can be a delay of up to c.updateInterval32. To avoid
303303
// falsely excluding a metric based on its lastUpdate timestamp we offset the from time
304304
// by updateInterval32, this way we err on the "too inclusive" side
305305
if from > int64(c.updateInterval32) {
306306
from -= int64(c.updateInterval32)
307307
}
308-
return c.MemoryIndex.Find(orgId, pattern, from)
308+
return c.MemoryIndex.Find(orgId, pattern, from, limit)
309309
}
310310

311311
func (c *CasIdx) rebuildIndex() {

0 commit comments

Comments
 (0)