@@ -100,7 +100,9 @@ type Series struct {
100
100
Node cluster.Node
101
101
}
102
102
103
- func (s * Server ) findSeries (ctx context.Context , orgId uint32 , patterns []string , seenAfter int64 , maxSeries int ) ([]Series , error ) {
103
+ // findSeries returns for each requested pattern, a list of index nodes (collected across the entire cluster), these can be both leaves or branches
104
+ // whether or not the caller wants only leaves, the limit only applies to leaves (series), not branches
105
+ func (s * Server ) findSeries (ctx context.Context , orgId uint32 , patterns []string , seenAfter int64 , leavesOnly bool , maxSeries int ) ([]Series , error ) {
104
106
105
107
fetchFn := func (reqCtx context.Context , peer cluster.Node , peerGroups map [int32 ][]cluster.Node ) (interface {}, error ) {
106
108
ourParts := len (peer .GetPartitions ())
@@ -128,46 +130,55 @@ func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string
128
130
}
129
131
return peer .Post (reqCtx , "findSeriesRemote" , "/index/find" , data )
130
132
}
131
- resps , err := s .queryAllShards (ctx , "findSeriesRemote" , fetchFn )
132
- if err != nil {
133
- return nil , err
134
- }
135
- select {
136
- case <- ctx .Done ():
137
- //request canceled
138
- return nil , nil
139
- default :
140
- }
141
133
134
+ ctx , cancel := context .WithCancel (ctx )
135
+ defer cancel ()
136
+
137
+ var leavesCnt int
142
138
series := make ([]Series , 0 )
143
- resp := models.IndexFindResp {}
144
- for _ , r := range resps {
145
- if len (series ) == maxSeries {
146
- return nil , response .NewError (
147
- http .StatusRequestEntityTooLarge ,
148
- fmt .Sprintf ("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit." , maxSeriesPerReq ))
149
- }
150
- _ , err = resp .UnmarshalMsg (r .buf )
151
- if err != nil {
152
- return nil , err
153
- }
154
139
155
- for pattern , nodes := range resp .Nodes {
156
- if len (series ) == maxSeries {
157
- return nil , response .NewError (
158
- http .StatusRequestEntityTooLarge ,
159
- fmt .Sprintf ("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit." , maxSeriesPerReq ))
160
- }
140
+ responseChan , errorChan := s .queryAllShardsGeneric (ctx , "findSeriesRemote" , fetchFn )
161
141
162
- series = append (series , Series {
163
- Pattern : pattern ,
164
- Node : r .peer ,
165
- Series : nodes ,
166
- })
167
- log .Debugf ("HTTP findSeries %d matches for %s found on %s" , len (nodes ), pattern , r .peer .GetName ())
142
+ MainLoop:
143
+ for {
144
+ select {
145
+ case <- ctx .Done ():
146
+ //request canceled
147
+ return nil , nil
148
+ case err := <- errorChan :
149
+ return nil , err
150
+ case r , ok := <- responseChan :
151
+ if ! ok {
152
+ break MainLoop
153
+ }
154
+ resp := models.IndexFindResp {}
155
+ _ , err := resp .UnmarshalMsg (r .resp .([]byte ))
156
+ if err != nil {
157
+ return nil , err
158
+ }
159
+ for pattern , nodes := range resp .Nodes {
160
+ s := Series {
161
+ Pattern : pattern ,
162
+ Node : r .peer ,
163
+ }
164
+ for _ , node := range nodes {
165
+ if node .Leaf {
166
+ leavesCnt ++
167
+ if maxSeries > 0 && leavesCnt > maxSeries {
168
+ return nil , response .NewError (
169
+ http .StatusRequestEntityTooLarge ,
170
+ fmt .Sprintf ("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit." , maxSeriesPerReq ))
171
+ }
172
+ } else if leavesOnly {
173
+ continue
174
+ }
175
+ s .Series = append (s .Series , node )
176
+ }
177
+ series = append (series , s )
178
+ log .Debugf ("HTTP findSeries %d matches for %s found on %s" , len (nodes ), pattern , r .peer .GetName ())
179
+ }
168
180
}
169
181
}
170
-
171
182
return series , nil
172
183
}
173
184
@@ -362,7 +373,7 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
362
373
}
363
374
nodes := make ([]idx.Node , 0 )
364
375
reqCtx := ctx .Req .Context ()
365
- series , err := s .findSeries (reqCtx , ctx .OrgId , []string {request .Query }, int64 (fromUnix ), maxSeriesPerReq )
376
+ series , err := s .findSeries (reqCtx , ctx .OrgId , []string {request .Query }, int64 (fromUnix ), false , maxSeriesPerReq )
366
377
if err != nil {
367
378
response .Write (ctx , response .WrapError (err ))
368
379
return
@@ -410,17 +421,13 @@ func (s *Server) metricsExpand(ctx *middleware.Context, request models.GraphiteE
410
421
for i , query := range request .Query {
411
422
i , query := i , query
412
423
g .Go (func () error {
413
- series , err := s .findSeries (errGroupCtx , ctx .OrgId , []string {query }, 0 , maxSeriesPerReq )
424
+ series , err := s .findSeries (errGroupCtx , ctx .OrgId , []string {query }, 0 , request . LeavesOnly , maxSeriesPerReq )
414
425
if err != nil {
415
426
return err
416
427
}
417
428
results [i ] = make (map [string ]struct {})
418
429
for _ , s := range series {
419
430
for _ , n := range s .Series {
420
- if request .LeavesOnly && ! n .Leaf {
421
- continue
422
- }
423
-
424
431
results [i ][n.Path ] = struct {}{}
425
432
}
426
433
}
@@ -855,7 +862,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan *expr.Plan)
855
862
}
856
863
series , err = s .clusterFindByTag (ctx , orgId , exprs , int64 (r .From ), findLimit , false )
857
864
} else {
858
- series , err = s .findSeries (ctx , orgId , []string {r .Query }, int64 (r .From ), findLimit )
865
+ series , err = s .findSeries (ctx , orgId , []string {r .Query }, int64 (r .From ), true , findLimit )
859
866
}
860
867
if err != nil {
861
868
return nil , meta , err
0 commit comments