Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache): Add Cache-Control: no-cache support for Loki instant queries. #12896

Merged
merged 11 commits into from
May 19, 2024
1 change: 1 addition & 0 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func newQueryClient(app *kingpin.Application) client.Client {
app.Flag("key", "Path to the client certificate key. Can also be set using LOKI_CLIENT_KEY_PATH env var.").Default("").Envar("LOKI_CLIENT_KEY_PATH").StringVar(&client.TLSConfig.KeyFile)
app.Flag("org-id", "adds X-Scope-OrgID to API requests for representing tenant ID. Useful for requesting tenant data when bypassing an auth gateway. Can also be set using LOKI_ORG_ID env var.").Default("").Envar("LOKI_ORG_ID").StringVar(&client.OrgID)
app.Flag("query-tags", "adds X-Query-Tags http header to API requests. This header value will be part of `metrics.go` statistics. Useful for tracking the query. Can also be set using LOKI_QUERY_TAGS env var.").Default("").Envar("LOKI_QUERY_TAGS").StringVar(&client.QueryTags)
app.Flag("nocache", "adds Cache-Control: no-cache http header to API requests. Can also be set using LOKI_NO_CACHE env var.").Default("false").Envar("LOKI_NO_CACHE").BoolVar(&client.NoCache)
app.Flag("bearer-token", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN env var.").Default("").Envar("LOKI_BEARER_TOKEN").StringVar(&client.BearerToken)
app.Flag("bearer-token-file", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN_FILE env var.").Default("").Envar("LOKI_BEARER_TOKEN_FILE").StringVar(&client.BearerTokenFile)
app.Flag("retries", "How many times to retry each query when getting an error response from Loki. Can also be set using LOKI_CLIENT_RETRIES env var.").Default("0").Envar("LOKI_CLIENT_RETRIES").IntVar(&client.Retries)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
)

type errTooFarBehind struct {
// original timestmap of the entry itself.
// original timestamp of the entry itself.
entryTs time.Time

// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
Expand Down
15 changes: 13 additions & 2 deletions pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
volumeRangePath = "/loki/api/v1/index/volume_range"
detectedFieldsPath = "/loki/api/v1/detected_fields"
defaultAuthHeader = "Authorization"

// HTTP header keys
HTTPScopeOrgID = "X-Scope-OrgID"
HTTPQueryTags = "X-Query-Tags"
HTTPCacheControl = "Cache-Control"
HTTPCacheControlNoCache = "no-cache"
)

var userAgent = fmt.Sprintf("loki-logcli/%s", build.Version)
Expand Down Expand Up @@ -77,6 +83,7 @@ type DefaultClient struct {
BearerTokenFile string
Retries int
QueryTags string
NoCache bool
AuthHeader string
ProxyURL string
BackoffConfig BackoffConfig
Expand Down Expand Up @@ -372,11 +379,15 @@ func (c *DefaultClient) getHTTPRequestHeader() (http.Header, error) {
h.Set("User-Agent", userAgent)

if c.OrgID != "" {
h.Set("X-Scope-OrgID", c.OrgID)
h.Set(HTTPScopeOrgID, c.OrgID)
}

if c.NoCache {
h.Set(HTTPCacheControl, HTTPCacheControlNoCache)
}

if c.QueryTags != "" {
h.Set("X-Query-Tags", c.QueryTags)
h.Set(HTTPQueryTags, c.QueryTags)
}

if (c.Username != "" || c.Password != "") && (len(c.BearerToken) > 0 || len(c.BearerTokenFile) > 0) {
Expand Down
76 changes: 66 additions & 10 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util"
)

Expand All @@ -42,6 +43,7 @@ type Params interface {
Shards() []string
GetExpression() syntax.Expr
GetStoreChunks() *logproto.ChunkRefGroup
CachingOptions() resultscache.CachingOptions
}

func NewLiteralParams(
Expand All @@ -52,22 +54,70 @@ func NewLiteralParams(
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
) (LiteralParams, error) {
return newLiteralParams(
qs,
start,
end,
step,
interval,
direction,
limit,
shards,
storeChunks,
resultscache.CachingOptions{},
)
}

func NewLiteralParamsWithCaching(
qs string,
start, end time.Time,
step, interval time.Duration,
direction logproto.Direction,
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
cachingOptions resultscache.CachingOptions,
) (LiteralParams, error) {
return newLiteralParams(
qs,
start,
end,
step,
interval,
direction,
limit,
shards,
storeChunks,
cachingOptions,
)
}

func newLiteralParams(
qs string,
start, end time.Time,
step, interval time.Duration,
direction logproto.Direction,
limit uint32,
shards []string,
storeChunks *logproto.ChunkRefGroup,
cachingOptions resultscache.CachingOptions,
) (LiteralParams, error) {
p := LiteralParams{
queryString: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
shards: shards,
storeChunks: storeChunks,
queryString: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
shards: shards,
storeChunks: storeChunks,
cachingOptions: cachingOptions,
}
var err error
p.queryExpr, err = syntax.ParseExpr(qs)
return p, err

}

// LiteralParams impls Params
Expand All @@ -80,6 +130,7 @@ type LiteralParams struct {
shards []string
queryExpr syntax.Expr
storeChunks *logproto.ChunkRefGroup
cachingOptions resultscache.CachingOptions
}

func (p LiteralParams) Copy() LiteralParams { return p }
Expand Down Expand Up @@ -114,6 +165,11 @@ func (p LiteralParams) Shards() []string { return p.shards }
// StoreChunks impls Params
func (p LiteralParams) GetStoreChunks() *logproto.ChunkRefGroup { return p.storeChunks }

// CachingOptions returns whether Loki query created from this params should be cached.
func (p LiteralParams) CachingOptions() resultscache.CachingOptions {
return p.cachingOptions
}

// GetRangeType returns whether a query is an instant query or range query
func GetRangeType(q Params) QueryRangeType {
if q.Start() == q.End() && q.Step() == 0 {
Expand Down
48 changes: 43 additions & 5 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
"github.com/grafana/loki/v3/pkg/util/querylimits"
)

const (
cacheControlHeader = "Cache-Control"
noCacheVal = "no-cache"
)

var DefaultCodec = &Codec{}

type Codec struct{}
Expand Down Expand Up @@ -95,8 +100,6 @@ func (r *LokiRequest) LogToSpan(sp opentracing.Span) {
)
}

func (*LokiRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return }

func (r *LokiInstantRequest) GetStep() int64 {
return 0
}
Expand Down Expand Up @@ -142,8 +145,6 @@ func (r *LokiInstantRequest) LogToSpan(sp opentracing.Span) {
)
}

func (*LokiInstantRequest) GetCachingOptions() (res queryrangebase.CachingOptions) { return }

func (r *LokiSeriesRequest) GetEnd() time.Time {
return r.EndTs
}
Expand Down Expand Up @@ -329,20 +330,29 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

disableCacheReq := false

if strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal {
disableCacheReq = true
}

switch op := getOperation(r.URL.Path); op {
case QueryRangeOp:
req, err := parseRangeQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

return req, nil
case InstantQueryOp:
req, err := parseInstantQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

req.CachingOptions = queryrangebase.CachingOptions{
Disabled: disableCacheReq,
}

return req, nil
case SeriesOp:
req, err := loghttp.ParseAndValidateSeriesQuery(r)
Expand Down Expand Up @@ -1808,6 +1818,10 @@ func (p paramsRangeWrapper) Shards() []string {
return p.GetShards()
}

func (p paramsRangeWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsInstantWrapper struct {
*LokiInstantRequest
}
Expand Down Expand Up @@ -1840,6 +1854,10 @@ func (p paramsInstantWrapper) Shards() []string {
return p.GetShards()
}

func (p paramsInstantWrapper) CachingOptions() resultscache.CachingOptions {
return p.LokiInstantRequest.CachingOptions
}

type paramsSeriesWrapper struct {
*LokiSeriesRequest
}
Expand Down Expand Up @@ -1876,6 +1894,10 @@ func (p paramsSeriesWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsSeriesWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsLabelWrapper struct {
*LabelRequest
}
Expand Down Expand Up @@ -1912,6 +1934,10 @@ func (p paramsLabelWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsLabelWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsStatsWrapper struct {
*logproto.IndexStatsRequest
}
Expand Down Expand Up @@ -1948,6 +1974,10 @@ func (p paramsStatsWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsStatsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

type paramsDetectedFieldsWrapper struct {
*DetectedFieldsRequest
}
Expand Down Expand Up @@ -2040,6 +2070,14 @@ func (p paramsDetectedFieldsWrapper) GetStoreChunks() *logproto.ChunkRefGroup {
return nil
}

func (p paramsDetectedLabelsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

func (p paramsDetectedFieldsWrapper) CachingOptions() resultscache.CachingOptions {
return resultscache.CachingOptions{}
}

func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader {
var promHeaders []queryrangebase.PrometheusResponseHeader
for h, hv := range httpHeaders {
Expand Down
49 changes: 49 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,55 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
}
}

func Test_codec_DecodeRequest_cacheHeader(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "1")

tests := []struct {
name string
reqBuilder func() (*http.Request, error)
want queryrangebase.Request
}{
{
"query_instant",
func() (*http.Request, error) {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf(`/v1/query?time=%d&query={foo="bar"}&limit=200&direction=FORWARD`, start.UnixNano()),
nil,
)
if err == nil {
req.Header.Set(cacheControlHeader, noCacheVal)
}
return req, err
},
&LokiInstantRequest{
Query: `{foo="bar"}`,
Limit: 200,
Direction: logproto.FORWARD,
Path: "/v1/query",
TimeTs: start,
Plan: &plan.QueryPlan{
AST: syntax.MustParseExpr(`{foo="bar"}`),
},
CachingOptions: queryrangebase.CachingOptions{
Disabled: true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := tt.reqBuilder()
if err != nil {
t.Fatal(err)
}
got, err := DefaultCodec.DecodeRequest(ctx, req, nil)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}

func Test_codec_DecodeResponse(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
Plan: &plan.QueryPlan{
AST: params.GetExpression(),
},
StoreChunks: params.GetStoreChunks(),
StoreChunks: params.GetStoreChunks(),
CachingOptions: params.CachingOptions(),
}
}
return &LokiRequest{
Expand All @@ -61,7 +62,8 @@ func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
Plan: &plan.QueryPlan{
AST: params.GetExpression(),
},
StoreChunks: params.GetStoreChunks(),
StoreChunks: params.GetStoreChunks(),
CachingOptions: params.CachingOptions(),
}
}

Expand Down
Loading
Loading