Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Limit series from clusterByFind operation #1021

Merged
merged 12 commits into from
Oct 11, 2018
194 changes: 113 additions & 81 deletions api/cluster.go
Original file line number Diff line number Diff line change
@@ -294,7 +294,7 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
result := make(map[string]PeerResponse)
for resp := range responses {
if resp.err != nil {
return nil, err
return nil, resp.err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops!

}
result[resp.data.peer.GetName()] = resp.data
}
@@ -303,112 +303,144 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
}

// peerQuerySpeculative takes a request and the path to request it on, then fans it out
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted. If enough peers have been heard from (based on
// speculation-threshold configuration), and we are missing the others, try to
// speculatively query each other member of each shard group.
// across the cluster. If any peer fails requests to other peers are aborted. If enough
// peers have been heard from (based on speculation-threshold configuration), and we
// are missing the others, try to speculatively query each other member of each shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
// path: path to request on
func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
peerGroups, err := cluster.MembersForSpeculativeQuery()
if err != nil {
log.Errorf("HTTP peerQuery unable to get peers, %s", err.Error())
return nil, err
}
log.Debugf("HTTP %s across %d instances", name, len(peerGroups)-1)
result := make(map[string]PeerResponse)

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()
responseChan, errorChan := s.peerQuerySpeculativeChan(ctx, data, name, path)

originalPeers := make(map[string]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))
for resp := range responseChan {
result[resp.peer.GetName()] = resp
}

responses := make(chan struct {
shardGroup int32
data PeerResponse
err error
}, 1)
err := <-errorChan
return result, err
}

askPeer := func(shardGroup int32, peer cluster.Node) {
log.Debugf("HTTP Render querying %s%s", peer.GetName(), path)
buf, err := peer.Post(reqCtx, name, path, data)
// peerQuerySpeculativeChan takes a request and the path to request it on, then fans it out
// across the cluster. If any peer fails requests to other peers are aborted. If enough
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice peerQuerySpeculative description includes "...except to the local peer...", whereas this one does not.
i think the former is wrong

// peers have been heard from (based on speculation-threshold configuration), and we
// are missing the others, try to speculatively query other members of the shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
// path: path to request on
// resultChan: channel to put responses on as they come in
func (s *Server) peerQuerySpeculativeChan(ctx context.Context, data cluster.Traceable, name, path string) (<-chan PeerResponse, <-chan error) {
resultChan := make(chan PeerResponse)
errorChan := make(chan error, 1)

select {
case <-ctx.Done():
return
default:
// Not canceled, continue
}
go func() {
defer close(errorChan)
defer close(resultChan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor thought: would it be cleaner/simpler to just have 1 return channel? we could return PeerResponse along with its error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly. I like the 2 channels, because you can just loop over responses until it is closed, then quickly try to read into an err var for the return value. See this usage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough


peerGroups, err := cluster.MembersForSpeculativeQuery()
if err != nil {
cancel()
log.Errorf("HTTP Render error querying %s%s: %q", peer.GetName(), path, err.Error())
log.Errorf("HTTP peerQuery unable to get peers, %s", err.Error())
errorChan <- err
return
}
responses <- struct {
log.Debugf("HTTP %s across %d instances", name, len(peerGroups)-1)

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()

originalPeers := make(map[string]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))

responses := make(chan struct {
shardGroup int32
data PeerResponse
err error
}{shardGroup, PeerResponse{peer, buf}, err}
}
}, 1)

for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
go askPeer(group, peer)
}

result := make(map[string]PeerResponse)

var ticker *time.Ticker
var tickChan <-chan time.Time
if speculationThreshold != 1 {
ticker = time.NewTicker(5 * time.Millisecond)
tickChan = ticker.C
defer ticker.Stop()
}
askPeer := func(shardGroup int32, peer cluster.Node) {
log.Debugf("HTTP Render querying %s%s", peer.GetName(), path)
buf, err := peer.Post(reqCtx, name, path, data)

for len(receivedResponses) < len(peerGroups) {
select {
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
// already received this response (possibly speculatively)
continue
select {
case <-ctx.Done():
return
default:
// Not canceled, continue
}

if resp.err != nil {
return nil, err
if err != nil {
cancel()
log.Errorf("HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
}
responses <- struct {
shardGroup int32
data PeerResponse
err error
}{shardGroup, PeerResponse{peer, buf}, err}
}

result[resp.data.peer.GetName()] = resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(originalPeers, resp.data.peer.GetName())

case <-tickChan:
// Check if it's time to speculate!
percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups))
if percentReceived >= speculationThreshold {
// kick off speculative queries to other members now
ticker.Stop()
speculativeAttempts.Inc()
for shardGroup, peers := range peerGroups {
if _, ok := receivedResponses[shardGroup]; ok {
continue
}
eligiblePeers := peers[1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
go askPeer(group, peer)
}

var ticker *time.Ticker
var tickChan <-chan time.Time
if speculationThreshold != 1 {
ticker = time.NewTicker(5 * time.Millisecond)
tickChan = ticker.C
defer ticker.Stop()
}

for len(receivedResponses) < len(peerGroups) {
select {
case <-ctx.Done():
//request canceled
return
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
// already received this response (possibly speculatively)
continue
}

if resp.err != nil {
errorChan <- resp.err
return
}

resultChan <- resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(originalPeers, resp.data.peer.GetName())

case <-tickChan:
// Check if it's time to speculate!
percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups))
if percentReceived >= speculationThreshold {
// kick off speculative queries to other members now
ticker.Stop()
speculativeAttempts.Inc()
for shardGroup, peers := range peerGroups {
if _, ok := receivedResponses[shardGroup]; ok {
continue
}
eligiblePeers := peers[1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
}
}
}
}
}
}

if len(originalPeers) > 0 {
speculativeWins.Inc()
}
if len(originalPeers) > 0 {
speculativeWins.Inc()
}
}()

return result, nil
return resultChan, errorChan
}
2 changes: 2 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ import (
var (
maxPointsPerReqSoft int
maxPointsPerReqHard int
maxSeriesPerReq int
logMinDurStr string
logMinDur uint32

@@ -39,6 +40,7 @@ func ConfigSetup() {
apiCfg := flag.NewFlagSet("http", flag.ExitOnError)
apiCfg.IntVar(&maxPointsPerReqSoft, "max-points-per-req-soft", 1000000, "lower resolution rollups will be used to try and keep requests below this number of datapoints. (0 disables limit)")
apiCfg.IntVar(&maxPointsPerReqHard, "max-points-per-req-hard", 20000000, "limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)")
apiCfg.IntVar(&maxSeriesPerReq, "max-series-per-req", 250000, "limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)")
apiCfg.StringVar(&logMinDurStr, "log-min-dur", "5min", "only log incoming requests if their timerange is at least this duration. Use 0 to disable")

apiCfg.StringVar(&Addr, "listen", ":6060", "http listener address.")
38 changes: 21 additions & 17 deletions api/graphite.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package api
import (
"context"
"errors"
"fmt"
"math"
"net/http"
"sort"
@@ -607,7 +608,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan expr.Plan)
for i, e := range exprs {
exprs[i] = strings.Trim(e, " '\"")
}
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From))
series, err = s.clusterFindByTag(ctx, orgId, exprs, int64(r.From), maxSeriesPerReq-len(reqs))
} else {
series, err = s.findSeries(ctx, orgId, []string{r.Query}, int64(r.From))
}
@@ -798,7 +799,7 @@ func (s *Server) clusterTagDetails(ctx context.Context, orgId uint32, tag, filte

func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.GraphiteTagFindSeries) {
reqCtx := ctx.Req.Context()
series, err := s.clusterFindByTag(reqCtx, ctx.OrgId, request.Expr, request.From)
series, err := s.clusterFindByTag(reqCtx, ctx.OrgId, request.Expr, request.From, maxSeriesPerReq)
if err != nil {
response.Write(ctx, response.WrapError(err))
return
@@ -818,28 +819,30 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
response.Write(ctx, response.NewJson(200, seriesNames, ""))
}

func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64, maxSeries int) ([]Series, error) {
data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
resps, err := s.peerQuerySpeculative(ctx, data, "clusterFindByTag", "/index/find_by_tag")
if err != nil {
return nil, err
}

select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}
newCtx, cancel := context.WithCancel(ctx)
responseChan, errorChan := s.peerQuerySpeculativeChan(newCtx, data, "clusterFindByTag", "/index/find_by_tag")

var allSeries []Series

for _, r := range resps {
for r := range responseChan {
resp := models.IndexFindByTagResp{}
_, err = resp.UnmarshalMsg(r.buf)
_, err := resp.UnmarshalMsg(r.buf)
if err != nil {
cancel()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to just defer cancel() at the top like the peerQuery functions do?

Also, how much does this canceling really buy us if the top-level ctx will be Done when the error is returned?

Copy link
Contributor

@Dieterbe Dieterbe Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean with a ctx "being Done".
ctx.Done() just returns a channel that signals cancellation (by getting closed), but you still need to call a corresponding cancel function for the cancellation to happen.

I see 3 callers of this function:

  • /render -> renderMetrics -> executePlan
  • /tags/findSeries (graphiteTagFindSeries)
  • querier.Select

we would need all 3 to call a cancelfunc for my change to be unnecessary.
but from what i can tell, it looks like neither of them, or at least not the first two, do this. (unless macron automatically calls cancel on our behalf?)

but regardless, it shouldn't take much code spelunking for reader to assert whether we cancel a context when we should. so i rather call cancel wherever it seems sensible, even if we call it multiple times (which is harmless).
I think @woodsaj was trying to establish this convention when he first added cancellations throughout this code base.

as for using defer vs just putting the code at the return sites, for short functions it's really debatable and depends more on taste. so unless you have a strong argument, let's just stick with it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: Done

I mean that peerQuerySpeculativelyChan checks if the context is Done() https://github.com/grafana/metrictank/pull/1021/files#diff-bc8a656be21edce0cf2a74adf23d7aeaR401

Done isn't just done on cancel, but also on a final response being returned. So, as soon as the error propagates up and a response is delivered, peerQuerySpeculativelyChan will break its loop and call cancel(). This is done specifically so callers don't need to implement cancellation themselves. Granted, doing it directly will cancel a few milliseconds earlier, so might be marginally cheaper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancels that i added in clusterFindByTag target cases that peerQuerySpeculativeChan cannot detect itself. these are :

  • unmarshalling error
  • exceeding maxSeriesPerReq

the cancels triggered in peerQuerySpeculativeChan (upon erroring peer.Post and upon function return) would be triggered not all, or much later, respectively.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cancels that i added in clusterFindByTag target cases that peerQuerySpeculativeChan cannot detect itself.

True, but as long as these failures cause an error response to the HTTP request, the parent context's Done channel will be closed and peerQuerySpeculativeChan will short-circuit and will call cancel on the context it created.

This is the case in the 3 examples you posted, but I guess it's too much for clusterFindByTag to assume?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parent context's Done channel will be closed

where does this happen?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I didn't come to this conclusion by code inspection, but rather by logging when early responses happened. It definitely was how it behaved, but I couldn't tell you if it's by contract or a symptom of how I was querying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, well. this reinforces my point that these matters should be made much more obvious. hence rather a call to cancel more rather than too few.
good to merge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let me rebase

return nil, err
}

// 0 disables the check, so only check if maxSeriesPerReq > 0
if maxSeriesPerReq > 0 && len(resp.Metrics)+len(allSeries) > maxSeries {
cancel()
return nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you cancel ctx so outstanding peer queries are aborted?

Copy link
Collaborator Author

@shanson7 shanson7 Aug 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the query will be Doned when the error propagates up and a response is returned. But canceling it here could do it a little bit earlier.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also not entirely sure how to cancel this context here. I suppose we would need to make a new cancelable context here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense I think. Since we only get the ctx passed in, we don't know where what the cancelfunc was.
However, if you do something like newCtx, cancel := context.WithCancel(ctx) and pass newCtx into peerQuerySpeculativeChan, then you can cancel it.

response.NewError(
http.StatusRequestEntityTooLarge,
fmt.Sprintf("Request exceeds max-series-per-req limit (%d). Reduce the number of targets or ask your admin to increase the limit.", maxSeriesPerReq))
}

for _, series := range resp.Metrics {
allSeries = append(allSeries, Series{
Pattern: series.Path,
@@ -849,7 +852,8 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
}
}

return allSeries, nil
err := <-errorChan
return allSeries, err
}

func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) {
2 changes: 1 addition & 1 deletion api/prometheus_querier.go
Original file line number Diff line number Diff line change
@@ -64,7 +64,7 @@ func (q *querier) Select(matchers ...*labels.Matcher) (storage.SeriesSet, error)
}
}

series, err := q.clusterFindByTag(q.ctx, q.OrgID, expressions, 0)
series, err := q.clusterFindByTag(q.ctx, q.OrgID, expressions, 0, maxSeriesPerReq)
if err != nil {
return nil, err
}
4 changes: 3 additions & 1 deletion cluster/node.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
@@ -202,7 +203,8 @@ func (n HTTPNode) GetName() string {
func handleResp(rsp *http.Response) ([]byte, error) {
defer rsp.Body.Close()
if rsp.StatusCode != 200 {
ioutil.ReadAll(rsp.Body)
// Read in body so that the connection can be reused
io.Copy(ioutil.Discard, rsp.Body)
return nil, NewError(rsp.StatusCode, fmt.Errorf(rsp.Status))
}
return ioutil.ReadAll(rsp.Body)
2 changes: 2 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
@@ -153,6 +153,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
@@ -153,6 +153,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
@@ -153,6 +153,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
@@ -194,6 +194,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
@@ -156,6 +156,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions scripts/config/metrictank-docker.ini
Original file line number Diff line number Diff line change
@@ -153,6 +153,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite
2 changes: 2 additions & 0 deletions scripts/config/metrictank-package.ini
Original file line number Diff line number Diff line change
@@ -153,6 +153,8 @@ key-file = /etc/ssl/private/ssl-cert-snakeoil.key
max-points-per-req-soft = 1000000
# limit of number of datapoints a request can return. Requests that exceed this limit will be rejected. (0 disables limit)
max-points-per-req-hard = 20000000
# limit of number of series a request can operate on. Requests that exceed this limit will be rejected. (0 disables limit)
max-series-per-req = 250000
# require x-org-id authentication to auth as a specific org. otherwise orgId 1 is assumed
multi-tenant = true
# in case our /render endpoint does not support the requested processing, proxy the request to this graphite