Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Speculative queries #956

Merged
merged 9 commits into from
Jul 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,31 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/grafana/metrictank/api/middleware"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/api/response"
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"github.com/tinylib/msgp/msgp"
)

var NotFoundErr = errors.New("not found")

var (

// metric api.cluster.speculative.attempts is how many peer queries resulted in speculation
speculativeAttempts = stats.NewCounter32("api.cluster.speculative.attempts")

// metric api.cluster.speculative.wins is how many peer queries were improved due to speculation
speculativeWins = stats.NewCounter32("api.cluster.speculative.wins")

// metric api.cluster.speculative.requests is how many speculative http requests made to peers
speculativeRequests = stats.NewCounter32("api.cluster.speculative.requests")
)

func (s *Server) explainPriority(ctx *middleware.Context) {
var data []interface{}
for _, p := range s.prioritySetters {
Expand Down Expand Up @@ -287,3 +301,108 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa

return result, nil
}

// peerQuerySpeculative takes a request and the path to request it on, then fans it out
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted. If enough peers have been heard from (based on
// speculation-threshold configuration), and we are missing the others, try to
// speculatively query other members of the shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
// path: path to request on
func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
peerGroups, err := cluster.MembersForSpeculativeQuery()
if err != nil {
log.Error(3, "HTTP peerQuery unable to get peers, %s", err)
return nil, err
}
log.Debug("HTTP %s across %d instances", name, len(peerGroups)-1)

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()

originalPeers := make(map[string]struct{}, len(peerGroups))
pendingResponses := make(map[int32]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))

responses := make(chan struct {
shardGroup int32
data PeerResponse
err error
}, 1)

askPeer := func(shardGroup int32, peer cluster.Node) {
log.Debug("HTTP Render querying %s%s", peer.GetName(), path)
buf, err := peer.Post(reqCtx, name, path, data)

select {
case <-ctx.Done():
return
default:
// Not canceled, continue
}

if err != nil {
cancel()
log.Error(4, "HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
}
responses <- struct {
shardGroup int32
data PeerResponse
err error
}{shardGroup, PeerResponse{peer, buf}, err}
}

for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
pendingResponses[group] = struct{}{}
go askPeer(group, peer)
}

result := make(map[string]PeerResponse)

specCheckTicker := time.NewTicker(5 * time.Millisecond)

for len(pendingResponses) > 0 {
select {
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
// already received this response (possibly speculatively)
continue
}

if resp.err != nil {
return nil, err
}

result[resp.data.peer.GetName()] = resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(pendingResponses, resp.shardGroup)
delete(originalPeers, resp.data.peer.GetName())

case <-specCheckTicker.C:
// Check if it's time to speculate!
percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups)))
if percentReceived > speculationThreshold {
// kick off speculative queries to other members now
specCheckTicker.Stop()
speculativeAttempts.Inc()
for shardGroup := range pendingResponses {
eligiblePeers := peerGroups[shardGroup][1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
}
}
}
}
}

if len(originalPeers) > 0 {
speculativeWins.Inc()
}

return result, nil
}
2 changes: 2 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (

getTargetsConcurrency int
tagdbDefaultLimit uint
speculationThreshold float64

graphiteProxy *httputil.ReverseProxy
timeZone *time.Location
Expand All @@ -50,6 +51,7 @@ func ConfigSetup() {
apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone")
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
apiCfg.UintVar(&tagdbDefaultLimit, "tagdb-default-limit", 100, "default limit for tagdb query results, can be overridden with query parameter \"limit\"")
apiCfg.Float64Var(&speculationThreshold, "speculation-threshold", 1, "ratio of peer responses after which speculation is used. Set to 1 to disable.")
globalconf.Register("http", apiCfg)
}

Expand Down
158 changes: 35 additions & 123 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,64 +55,39 @@ type Series struct {
}

func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string, seenAfter int64) ([]Series, error) {
peers, err := cluster.MembersForQuery()
data := models.IndexFind{
Patterns: patterns,
OrgId: orgId,
From: seenAfter,
}

resps, err := s.peerQuerySpeculative(ctx, data, "findSeriesRemote", "/index/find")
if err != nil {
log.Error(3, "HTTP findSeries unable to get peers, %s", err)
return nil, err
}
log.Debug("HTTP findSeries for %v across %d instances", patterns, len(peers))
var wg sync.WaitGroup

responses := make(chan struct {
series []Series
err error
}, 1)
findCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, peer := range peers {
log.Debug("HTTP findSeries getting results from %s", peer.GetName())
wg.Add(1)
if peer.IsLocal() {
go func() {
result, err := s.findSeriesLocal(findCtx, orgId, patterns, seenAfter)
if err != nil {
// cancel requests on all other peers.
cancel()
}
responses <- struct {
series []Series
err error
}{result, err}
wg.Done()
}()
} else {
go func(peer cluster.Node) {
result, err := s.findSeriesRemote(findCtx, orgId, patterns, seenAfter, peer)
if err != nil {
// cancel requests on all other peers.
cancel()
}
responses <- struct {
series []Series
err error
}{result, err}
wg.Done()
}(peer)
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

// wait for all findSeries goroutines to end, then close our responses channel
go func() {
wg.Wait()
close(responses)
}()

series := make([]Series, 0)
for resp := range responses {
if resp.err != nil {
resp := models.IndexFindResp{}
for _, r := range resps {
_, err = resp.UnmarshalMsg(r.buf)
if err != nil {
return nil, err
}
series = append(series, resp.series...)

for pattern, nodes := range resp.Nodes {
series = append(series, Series{
Pattern: pattern,
Node: r.peer,
Series: nodes,
})
log.Debug("HTTP findSeries %d matches for %s found on %s", len(nodes), pattern, r.peer.GetName())
}
}

return series, nil
Expand Down Expand Up @@ -864,22 +839,10 @@ func (s *Server) graphiteTagDetails(ctx *middleware.Context, request models.Grap
}

func (s *Server) clusterTagDetails(ctx context.Context, orgId uint32, tag, filter string, from int64) (map[string]uint64, error) {
result, err := s.MetricIndex.TagDetails(orgId, tag, filter, from)
if err != nil {
return nil, err
}
if result == nil {
result = make(map[string]uint64)
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}
result := make(map[string]uint64)

data := models.IndexTagDetails{OrgId: orgId, Tag: tag, Filter: filter, From: from}
resps, err := s.peerQuery(ctx, data, "clusterTagDetails", "/index/tag_details", false)
resps, err := s.peerQuerySpeculative(ctx, data, "clusterTagDetails", "/index/tag_details")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -926,7 +889,8 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
}

func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
result, err := s.MetricIndex.FindByTag(orgId, expressions, from)
data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
resps, err := s.peerQuerySpeculative(ctx, data, "clusterFindByTag", "/index/find_by_tag")
if err != nil {
return nil, err
}
Expand All @@ -940,27 +904,6 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions

var allSeries []Series

for _, series := range result {
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: cluster.Manager.ThisNode(),
Series: []idx.Node{series},
})
}

data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
resps, err := s.peerQuery(ctx, data, "clusterFindByTag", "/index/find_by_tag", false)
if err != nil {
return nil, err
}

select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

for _, r := range resps {
resp := models.IndexFindByTagResp{}
_, err = resp.UnmarshalMsg(r.buf)
Expand Down Expand Up @@ -1003,24 +946,8 @@ func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTa
}

func (s *Server) clusterTags(ctx context.Context, orgId uint32, filter string, from int64) ([]string, error) {
result, err := s.MetricIndex.Tags(orgId, filter, from)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

tagSet := make(map[string]struct{}, len(result))
for _, tag := range result {
tagSet[tag] = struct{}{}
}

data := models.IndexTags{OrgId: orgId, Filter: filter, From: from}
resps, err := s.peerQuery(ctx, data, "clusterTags", "/index/tags", false)
resps, err := s.peerQuerySpeculative(ctx, data, "clusterTags", "/index/tags")
if err != nil {
return nil, err
}
Expand All @@ -1032,6 +959,7 @@ func (s *Server) clusterTags(ctx context.Context, orgId uint32, filter string, f
default:
}

tagSet := make(map[string]struct{})
resp := models.IndexTagsResp{}
for _, r := range resps {
_, err = resp.UnmarshalMsg(r.buf)
Expand Down Expand Up @@ -1066,18 +994,10 @@ func (s *Server) graphiteAutoCompleteTags(ctx *middleware.Context, request model
}

func (s *Server) clusterAutoCompleteTags(ctx context.Context, orgId uint32, prefix string, expressions []string, from int64, limit uint) ([]string, error) {
result, err := s.MetricIndex.FindTags(orgId, prefix, expressions, from, limit)
if err != nil {
return nil, err
}

tagSet := make(map[string]struct{}, len(result))
for _, tag := range result {
tagSet[tag] = struct{}{}
}
tagSet := make(map[string]struct{})

data := models.IndexAutoCompleteTags{OrgId: orgId, Prefix: prefix, Expr: expressions, From: from, Limit: limit}
responses, err := s.peerQuery(ctx, data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags", false)
responses, err := s.peerQuerySpeculative(ctx, data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1121,18 +1041,10 @@ func (s *Server) graphiteAutoCompleteTagValues(ctx *middleware.Context, request
}

func (s *Server) clusterAutoCompleteTagValues(ctx context.Context, orgId uint32, tag, prefix string, expressions []string, from int64, limit uint) ([]string, error) {
result, err := s.MetricIndex.FindTagValues(orgId, tag, prefix, expressions, from, limit)
if err != nil {
return nil, err
}

valSet := make(map[string]struct{}, len(result))
for _, val := range result {
valSet[val] = struct{}{}
}
valSet := make(map[string]struct{})

data := models.IndexAutoCompleteTagValues{OrgId: orgId, Tag: tag, Prefix: prefix, Expr: expressions, From: from, Limit: limit}
responses, err := s.peerQuery(ctx, data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values", false)
responses, err := s.peerQuerySpeculative(ctx, data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values")
if err != nil {
return nil, err
}
Expand Down
Loading