Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 2d03fbc

Browse files
authored
Merge branch 'master' into asPercent
2 parents 5692856 + 2cb286b commit 2d03fbc

34 files changed

+723
-214
lines changed

api/cluster.go

+125
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,31 @@ import (
77
"net/http"
88
"strconv"
99
"sync"
10+
"time"
1011

1112
"github.com/grafana/metrictank/api/middleware"
1213
"github.com/grafana/metrictank/api/models"
1314
"github.com/grafana/metrictank/api/response"
1415
"github.com/grafana/metrictank/cluster"
16+
"github.com/grafana/metrictank/stats"
1517
"github.com/raintank/worldping-api/pkg/log"
1618
"github.com/tinylib/msgp/msgp"
1719
)
1820

1921
var NotFoundErr = errors.New("not found")
2022

23+
var (
24+
25+
// metric api.cluster.speculative.attempts is how many peer queries resulted in speculation
26+
speculativeAttempts = stats.NewCounter32("api.cluster.speculative.attempts")
27+
28+
// metric api.cluster.speculative.wins is how many peer queries were improved due to speculation
29+
speculativeWins = stats.NewCounter32("api.cluster.speculative.wins")
30+
31+
// metric api.cluster.speculative.requests is how many speculative http requests made to peers
32+
speculativeRequests = stats.NewCounter32("api.cluster.speculative.requests")
33+
)
34+
2135
func (s *Server) explainPriority(ctx *middleware.Context) {
2236
var data []interface{}
2337
for _, p := range s.prioritySetters {
@@ -287,3 +301,114 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa
287301

288302
return result, nil
289303
}
304+
305+
// peerQuerySpeculative takes a request and the path to request it on, then fans it out
306+
// across the cluster, except to the local peer. If any peer fails requests to
307+
// other peers are aborted. If enough peers have been heard from (based on
308+
// speculation-threshold configuration), and we are missing the others, try to
309+
// speculatively query each other member of each shard group.
310+
// ctx: request context
311+
// data: request to be submitted
312+
// name: name to be used in logging & tracing
313+
// path: path to request on
314+
func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
315+
peerGroups, err := cluster.MembersForSpeculativeQuery()
316+
if err != nil {
317+
log.Error(3, "HTTP peerQuery unable to get peers, %s", err)
318+
return nil, err
319+
}
320+
log.Debug("HTTP %s across %d instances", name, len(peerGroups)-1)
321+
322+
reqCtx, cancel := context.WithCancel(ctx)
323+
defer cancel()
324+
325+
originalPeers := make(map[string]struct{}, len(peerGroups))
326+
receivedResponses := make(map[int32]struct{}, len(peerGroups))
327+
328+
responses := make(chan struct {
329+
shardGroup int32
330+
data PeerResponse
331+
err error
332+
}, 1)
333+
334+
askPeer := func(shardGroup int32, peer cluster.Node) {
335+
log.Debug("HTTP Render querying %s%s", peer.GetName(), path)
336+
buf, err := peer.Post(reqCtx, name, path, data)
337+
338+
select {
339+
case <-ctx.Done():
340+
return
341+
default:
342+
// Not canceled, continue
343+
}
344+
345+
if err != nil {
346+
cancel()
347+
log.Error(4, "HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
348+
}
349+
responses <- struct {
350+
shardGroup int32
351+
data PeerResponse
352+
err error
353+
}{shardGroup, PeerResponse{peer, buf}, err}
354+
}
355+
356+
for group, peers := range peerGroups {
357+
peer := peers[0]
358+
originalPeers[peer.GetName()] = struct{}{}
359+
go askPeer(group, peer)
360+
}
361+
362+
result := make(map[string]PeerResponse)
363+
364+
var ticker *time.Ticker
365+
var tickChan <-chan time.Time
366+
if speculationThreshold != 1 {
367+
ticker = time.NewTicker(5 * time.Millisecond)
368+
tickChan = ticker.C
369+
defer ticker.Stop()
370+
}
371+
372+
for len(receivedResponses) < len(peerGroups) {
373+
select {
374+
case resp := <-responses:
375+
if _, ok := receivedResponses[resp.shardGroup]; ok {
376+
// already received this response (possibly speculatively)
377+
continue
378+
}
379+
380+
if resp.err != nil {
381+
return nil, err
382+
}
383+
384+
result[resp.data.peer.GetName()] = resp.data
385+
receivedResponses[resp.shardGroup] = struct{}{}
386+
delete(originalPeers, resp.data.peer.GetName())
387+
388+
case <-tickChan:
389+
// Check if it's time to speculate!
390+
percentReceived := float64(len(receivedResponses)) / float64(len(peerGroups))
391+
if percentReceived >= speculationThreshold {
392+
// kick off speculative queries to other members now
393+
ticker.Stop()
394+
speculativeAttempts.Inc()
395+
for shardGroup, peers := range peerGroups {
396+
if _, ok := receivedResponses[shardGroup]; ok {
397+
continue
398+
}
399+
eligiblePeers := peers[1:]
400+
for _, peer := range eligiblePeers {
401+
speculativeRequests.Inc()
402+
go askPeer(shardGroup, peer)
403+
}
404+
}
405+
}
406+
}
407+
}
408+
409+
if len(originalPeers) > 0 {
410+
speculativeWins.Inc()
411+
}
412+
413+
return result, nil
414+
}

api/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var (
2929

3030
getTargetsConcurrency int
3131
tagdbDefaultLimit uint
32+
speculationThreshold float64
3233

3334
graphiteProxy *httputil.ReverseProxy
3435
timeZone *time.Location
@@ -50,6 +51,7 @@ func ConfigSetup() {
5051
apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone")
5152
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
5253
apiCfg.UintVar(&tagdbDefaultLimit, "tagdb-default-limit", 100, "default limit for tagdb query results, can be overridden with query parameter \"limit\"")
54+
apiCfg.Float64Var(&speculationThreshold, "speculation-threshold", 1, "ratio of peer responses after which speculation is used. Set to 1 to disable.")
5355
globalconf.Register("http", apiCfg)
5456
}
5557

0 commit comments

Comments
 (0)