Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add experimental Select() cache #36

Open
wants to merge 1 commit into
base: v0.29.0+vinted
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
Expand Down Expand Up @@ -79,6 +80,8 @@ func registerQuery(app *extkingpin.App) {
httpBindAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd)
grpcBindAddr, grpcGracePeriod, grpcCert, grpcKey, grpcClientCA, grpcMaxConnAge := extkingpin.RegisterGRPCFlags(cmd)

selectCacheSize := cmd.Flag("grpc-select-cache-size", "How many bytes should be allocated for storing Select() results in memory for each StoreAPI").Default("0").Bytes()

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
Expand Down Expand Up @@ -319,6 +322,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
uint64(*selectCacheSize),
)
})
}
Expand Down Expand Up @@ -395,6 +399,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
selectCacheSize uint64,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand All @@ -409,7 +414,7 @@ func runQuery(
Help: "The number of times a duplicated store addresses is detected from the different configs in query",
})

dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName)
dialOpts, err := extgrpc.StoreClientGRPCOpts(logger, reg, tracer, secure, skipVerify, cert, key, caCert, serverName, model.Bytes(selectCacheSize))
if err != nil {
return errors.Wrap(err, "building gRPC client")
}
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func runReceive(
conf.rwClientKey,
conf.rwClientServerCA,
conf.rwClientServerName,
0,
)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion pkg/extgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,23 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/thanos-io/thanos/pkg/extgrpc/grpccache"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/tracing"
)

// StoreClientGRPCOpts creates gRPC dial options for connecting to a store client.
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string) ([]grpc.DialOption, error) {
func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, secure, skipVerify bool, cert, key, caCert, serverName string, maxCacheSize model.Bytes) ([]grpc.DialOption, error) {
Copy link

@Titasp Titasp Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style question, wouldn't it be better to refactor long function like this? (so that you wouldn't need to use horizontal slider to see all function params 😄 )

func StoreClientGRPCOpts(
  logger log.Logger, 
  reg *prometheus.Registry, 
  tracer opentracing.Tracer,
  secure, skipVerify bool, 
  cert, key, caCert, serverName string, 
  maxCacheSize model.Bytes,
) ([]grpc.DialOption, error) {
   //...
}

grpcMets := grpc_prometheus.NewClientMetrics()
grpcMets.EnableClientHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}),
)

cacher, err := grpccache.NewSeriesRequestcachingInterceptor(reg, maxCacheSize, 128*1024*1024)
if err != nil {
return nil, err
}
dialOpts := []grpc.DialOption{
// We want to make sure that we can receive huge gRPC messages from storeAPI.
// On TCP level we can be fine, but the gRPC overhead for huge messages could be significant.
Expand All @@ -42,6 +49,7 @@ func StoreClientGRPCOpts(logger log.Logger, reg *prometheus.Registry, tracer ope
grpc_middleware.ChainStreamClient(
grpcMets.StreamClientInterceptor(),
tracing.StreamClientInterceptor(tracer),
cacher.GetInterceptor(),
),
),
}
Expand Down
202 changes: 202 additions & 0 deletions pkg/extgrpc/grpccache/interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package grpccache

import (
"context"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/store/storepb"
"google.golang.org/grpc"
)

func getResponses(c *cache.InMemoryCache, reqSig string) ([]*storepb.SeriesResponse, error) {
numKeysKey := fmt.Sprintf("%s-num", reqSig)
numKeysFetchRes := c.Fetch(context.Background(), []string{numKeysKey})

var numKeys int
if numKeysBytes, ok := numKeysFetchRes[numKeysKey]; !ok {
return nil, fmt.Errorf("not found num key")
} else {
if n, err := fmt.Sscanf(string(numKeysBytes), "%d", &numKeys); n != 1 || err != nil {
return nil, fmt.Errorf("parsing %s", string(numKeysBytes))
}
}

var fetchKeys = make([]string, 0, numKeys)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: var fetchKeys = ... -> fetchKeys := make([]string, 0, numKeys) ?

for i := 0; i < numKeys; i++ {
fetchKeys = append(fetchKeys, fmt.Sprintf("%s-%d", reqSig, i))
}
fetchedData := c.Fetch(context.Background(), fetchKeys)

if len(fetchedData) != numKeys {
return nil, fmt.Errorf("got wrong number of keys: expected %d, got %d", numKeys, len(fetchedData))
}
ret := make([]*storepb.SeriesResponse, 0, numKeys)

for i := 0; i < numKeys; i++ {
var r storepb.SeriesResponse
data := fetchedData[fmt.Sprintf("%s-%d", reqSig, i)]

if err := r.Unmarshal(data); err != nil {
return nil, fmt.Errorf("unmarshaling data from cache: %w", err)
}

ret = append(ret, &r)
}

return ret, nil
}

func putResponses(c *cache.InMemoryCache, reqSig string, responses []*storepb.SeriesResponse) []byte {
keys := map[string][]byte{
fmt.Sprintf("%s-num", reqSig): []byte(fmt.Sprintf("%d", len(responses))),
}

for i, resp := range responses {
m, _ := resp.Marshal()
keys[fmt.Sprintf("%s-%d", reqSig, i)] = m
}

c.Store(context.Background(), keys, 5*time.Minute)
return nil
}

func splitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}

type seriesInterceptor struct {
grpc.ClientStream
target string
c *cache.InMemoryCache

hashedReq string

responses []*storepb.SeriesResponse
cachedResponsesAvailable bool

cachedCalls prometheus.Counter
}

func hashReqTarget(r *storepb.SeriesRequest, target string) string {
h := xxhash.New()
_, _ = h.WriteString(target)
m, _ := r.Marshal()
_, _ = h.Write(m)

return string(h.Sum(nil))
}

func (i *seriesInterceptor) RecvMsg(m interface{}) error {
if i.hashedReq == "" {
return i.ClientStream.RecvMsg(m)
}
if i.cachedResponsesAvailable {
if len(i.responses) == 0 {
return io.EOF
}
resp, ok := m.(*storepb.SeriesResponse)
if !ok {
panic("should be a series response type")
}
*resp = *i.responses[0]
i.responses = i.responses[1:]
return nil
}

if err := i.ClientStream.RecvMsg(m); err != nil {
if err == io.EOF && len(i.responses) > 0 {
putResponses(i.c, i.hashedReq, i.responses)
}
return err
}
if resp, ok := m.(*storepb.SeriesResponse); ok {
i.responses = append(i.responses, resp)
}
return nil
}

func (i *seriesInterceptor) SendMsg(m interface{}) error {
if req, ok := m.(*storepb.SeriesRequest); ok {
i.hashedReq = hashReqTarget(req, i.target)

responses, err := getResponses(i.c, i.hashedReq)
if err == nil {
i.responses = responses
i.cachedResponsesAvailable = true
i.cachedCalls.Inc()
}

}
return i.ClientStream.SendMsg(m)
}

type SeriesRequestCachingInterceptor struct {
inmemoryCache *cache.InMemoryCache
passthrough bool

cachedCalls prometheus.Counter
}

func NewSeriesRequestcachingInterceptor(reg *prometheus.Registry, maxSize model.Bytes, maxItemSize model.Bytes) (*SeriesRequestCachingInterceptor, error) {
if maxSize == 0 {
maxItemSize = 0
}
imc, err := cache.NewInMemoryCacheWithConfig(
"cachinginterceptor",
log.NewLogfmtLogger(os.Stderr),
reg,
cache.InMemoryCacheConfig{
MaxSize: maxSize,
MaxItemSize: maxItemSize,
},
)
if err != nil {
return nil, err
}
return &SeriesRequestCachingInterceptor{
inmemoryCache: imc,
passthrough: maxSize == 0,
cachedCalls: promauto.With(prometheus.Registerer(reg)).NewCounter(prometheus.CounterOpts{
Name: "thanos_grpc_cached_calls_total",
Help: "How many Series() calls were fully cached",
}),
}, nil
}

func (i *SeriesRequestCachingInterceptor) GetInterceptor() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
cs, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return cs, err
}

if i.passthrough {
return cs, nil
}

if svc, actualMethod := splitMethodName(method); svc == "thanos.Store" && actualMethod == "Series" {
return &seriesInterceptor{
ClientStream: cs,
target: cc.Target(),
c: i.inmemoryCache,
cachedCalls: i.cachedCalls,
}, nil
}

return cs, nil
}
}