Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement query sharding
Browse files Browse the repository at this point in the history
fpetkovski committed May 14, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 513d8f1 commit 6a4a646
Showing 39 changed files with 1,838 additions and 250 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
@@ -136,6 +136,8 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.forward-header", "List of headers forwarded by the query-frontend to downstream queriers, default is empty").PlaceHolder("<http-header-name>").StringsVar(&cfg.ForwardHeaders)

cmd.Flag("query-frontend.num-shards", "Number of queriers to use when sharding PromQL queries").IntVar(&cfg.NumShards)

cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("").EnumVar(&cfg.RequestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")
reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

5 changes: 3 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
@@ -334,8 +334,9 @@ func setupAndRunGRPCServer(g *run.Group,
if isReady() {
minTime, maxTime := mts.TimeRange()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
MinTime: minTime,
MaxTime: maxTime,
SupportsSharding: true,
}
}
return nil
5 changes: 3 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
@@ -268,8 +268,9 @@ func runSidecar(
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
}
}
return nil
5 changes: 3 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
@@ -393,8 +393,9 @@ func runStore(
if httpProbe.IsReady() {
mint, maxt := bs.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
}
}
return nil
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -98,6 +98,8 @@ require (
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

require github.com/stretchr/testify v1.7.0

require (
cloud.google.com/go v0.99.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
@@ -186,7 +188,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/sony/gobreaker v0.4.1 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/weaveworks/promrus v1.2.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da // indirect
go.elastic.co/apm/module/apmhttp v1.11.0 // indirect
2 changes: 2 additions & 0 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.ShardInfo,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
@@ -145,6 +146,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.ShardInfo,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
234 changes: 171 additions & 63 deletions pkg/api/query/querypb/query.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/api/query/querypb/query.proto
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ option go_package = "querypb";

import "gogoproto/gogo.proto";
import "store/storepb/types.proto";
import "store/storepb/rpc.proto";
import "store/storepb/prompb/types.proto";

option (gogoproto.sizer_all) = true;
@@ -36,6 +37,8 @@ message QueryRequest {
bool enablePartialResponse = 8;
bool enableQueryPushdown = 9;
bool skipChunks = 10;

ShardInfo shard_info = 11;
}

message StoreMatchers {
@@ -70,6 +73,8 @@ message QueryRangeRequest {
bool enablePartialResponse = 10;
bool enableQueryPushdown = 11;
bool skipChunks = 12;

ShardInfo shard_info = 13;
}

message QueryRangeResponse {
41 changes: 36 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ package v1

import (
"context"
"encoding/json"
"math"
"net/http"
"sort"
@@ -70,6 +71,7 @@ const (
StoreMatcherParam = "storeMatch[]"
Step = "step"
Stats = "stats"
ShardInfoParam = "shard_info"
)

// QueryAPI is an API used by Thanos Querier.
@@ -295,6 +297,25 @@ func (qapi *QueryAPI) parseStep(r *http.Request, defaultRangeQueryStep time.Dura
return d, nil
}

func (qapi *QueryAPI) parseShardInfo(r *http.Request) (*storepb.ShardInfo, *api.ApiError) {
data := r.FormValue(ShardInfoParam)
if data == "" {
return nil, nil
}

if len(data) == 0 {
return nil, nil
}

var info storepb.ShardInfo
err := json.Unmarshal([]byte(data), &info)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrapf(err, "'%s' parameter", ShardInfoParam)}
}

return &info, nil
}

func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiError) {
ts, err := parseTimeParam(r, "time", qapi.baseAPI.Now())
if err != nil {
@@ -338,13 +359,18 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

shardInfo, apiErr := qapi.parseShardInfo(r)
if apiErr != nil {
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false), r.FormValue("query"), ts)
qry, err := qe.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
@@ -451,6 +477,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

shardInfo, apiErr := qapi.parseShardInfo(r)
if apiErr != nil {
return nil, nil, apiErr
}

qe := qapi.queryEngine(maxSourceResolution)

// Record the query range requested.
@@ -461,7 +492,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qe.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, maxSourceResolution, enablePartialResponse, qapi.enableQueryPushdown, false, shardInfo),
r.FormValue("query"),
start,
end,
@@ -534,7 +565,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
@@ -621,7 +652,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeDebugMatchers, math.MaxInt64, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
@@ -671,7 +702,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true).
q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, qapi.enableQueryPushdown, true, nil).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
101 changes: 68 additions & 33 deletions pkg/info/infopb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/info/infopb/rpc.proto
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ message InfoResponse {
message StoreInfo {
int64 min_time = 1;
int64 max_time = 2;
bool supports_sharding = 3;
}

// RulesInfo holds the metadata related to Rules API exposed by the component.
11 changes: 11 additions & 0 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
@@ -760,6 +760,17 @@ func (er *endpointRef) TimeRange() (mint, maxt int64) {
return er.metadata.Store.MinTime, er.metadata.Store.MaxTime
}

func (er *endpointRef) SupportsSharding() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil || er.metadata.Store == nil {
return false
}

return er.metadata.Store.SupportsSharding
}

func (er *endpointRef) String() string {
mint, maxt := er.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", er.addr, labelpb.PromLabelSetsToString(er.LabelSets()), mint, maxt)
4 changes: 4 additions & 0 deletions pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Original file line number Diff line number Diff line change
@@ -204,6 +204,10 @@ func (s *storeRef) TimeRange() (int64, int64) {
return s.minTime, s.maxTime
}

func (s *storeRef) SupportsSharding() bool {
return false
}

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, labelpb.PromLabelSetsToString(s.LabelSets()), mint, maxt)
Loading

0 comments on commit 6a4a646

Please sign in to comment.