-
Notifications
You must be signed in to change notification settings - Fork 547
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The query frontend treats all jobs as the same size when it farms them out to the queriers. This can cause querier instability b/c some jobs actually require quite a bit more resources to execute. By assigning weights to jobs we can reduce the amount each querier is asked to do will hopefully: reduce querier OOMs/timeouts/retries reduce querier latency increase total throughput Other changes Removed the roundtripper httpgrpc bridge and pushed the concept of pipeline.Request all the way down into the cortex frontend code. This can be a nice perf improvement b/c translating http -> httpgrpc is costly and we are pushing it to the last moment. Currently for some queries we are translating thousands of jobs and then throwing them away. Removed redundant parseQuery and createFetchSpansRequest to consolidate on the Compile function in pkg/traceql Check for context error before going through retry logic in retryWare. This causes retry metrics to be more accurate in the event of many cancelled jobs.
- Loading branch information
1 parent
eca7f9c
commit 5aef523
Showing
26 changed files
with
581 additions
and
247 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
package pipeline | ||
|
||
import ( | ||
"github.com/grafana/tempo/modules/frontend/combiner" | ||
"github.com/grafana/tempo/pkg/traceql" | ||
) | ||
|
||
type RequestType int | ||
|
||
type WeightRequest interface { | ||
SetWeight(int) | ||
Weight() int | ||
} | ||
|
||
type WeightsConfig struct { | ||
RequestWithWeights bool `yaml:"request_with_weights,omitempty"` | ||
RetryWithWeights bool `yaml:"retry_with_weights,omitempty"` | ||
MaxTraceQLConditions int `yaml:"max_traceql_conditions,omitempty"` | ||
MaxRegexConditions int `yaml:"max_regex_conditions,omitempty"` | ||
} | ||
|
||
type Weights struct { | ||
DefaultWeight int | ||
TraceQLSearchWeight int | ||
TraceByIDWeight int | ||
MaxTraceQLConditions int | ||
MaxRegexConditions int | ||
} | ||
|
||
const ( | ||
Default RequestType = iota | ||
TraceByID | ||
TraceQLSearch | ||
TraceQLMetrics | ||
) | ||
|
||
type weightRequestWare struct { | ||
requestType RequestType | ||
enabled bool | ||
next AsyncRoundTripper[combiner.PipelineResponse] | ||
|
||
weights Weights | ||
} | ||
|
||
// It increments the weight of a retriyed request | ||
func IncrementRetriedRequestWeight(r WeightRequest) { | ||
r.SetWeight(r.Weight() + 1) | ||
} | ||
|
||
// It returns a new weight request middleware | ||
func NewWeightRequestWare(rt RequestType, cfg WeightsConfig) AsyncMiddleware[combiner.PipelineResponse] { | ||
weights := Weights{ | ||
DefaultWeight: 1, | ||
TraceQLSearchWeight: 1, | ||
TraceByIDWeight: 2, | ||
MaxTraceQLConditions: cfg.MaxTraceQLConditions, | ||
MaxRegexConditions: cfg.MaxRegexConditions, | ||
} | ||
return AsyncMiddlewareFunc[combiner.PipelineResponse](func(next AsyncRoundTripper[combiner.PipelineResponse]) AsyncRoundTripper[combiner.PipelineResponse] { | ||
return &weightRequestWare{ | ||
requestType: rt, | ||
enabled: cfg.RequestWithWeights, | ||
weights: weights, | ||
next: next, | ||
} | ||
}) | ||
} | ||
|
||
func (c weightRequestWare) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) { | ||
c.setWeight(req) | ||
return c.next.RoundTrip(req) | ||
} | ||
|
||
func (c weightRequestWare) setWeight(req Request) { | ||
if !c.enabled { | ||
req.SetWeight(c.weights.DefaultWeight) | ||
return | ||
} | ||
switch c.requestType { | ||
case TraceByID: | ||
req.SetWeight(c.weights.TraceByIDWeight) | ||
case TraceQLSearch, TraceQLMetrics: | ||
c.setTraceQLWeight(req) | ||
default: | ||
req.SetWeight(c.weights.DefaultWeight) | ||
} | ||
} | ||
|
||
func (c weightRequestWare) setTraceQLWeight(req Request) { | ||
var traceQLQuery string | ||
query := req.HTTPRequest().URL.Query() | ||
if query.Has("q") { | ||
traceQLQuery = query.Get("q") | ||
} | ||
if query.Has("query") { | ||
traceQLQuery = query.Get("query") | ||
} | ||
|
||
req.SetWeight(c.weights.TraceQLSearchWeight) | ||
|
||
if traceQLQuery == "" { | ||
return | ||
} | ||
|
||
_, _, _, spanRequest, err := traceql.Compile(traceQLQuery) | ||
if err != nil || spanRequest == nil { | ||
return | ||
} | ||
|
||
conditions := 0 | ||
regexConditions := 0 | ||
|
||
for _, c := range spanRequest.Conditions { | ||
if c.Op != traceql.OpNone { | ||
conditions++ | ||
} | ||
if c.Op == traceql.OpRegex || c.Op == traceql.OpNotRegex { | ||
regexConditions++ | ||
} | ||
} | ||
complexQuery := regexConditions >= c.weights.MaxRegexConditions || conditions >= c.weights.MaxTraceQLConditions | ||
if complexQuery { | ||
req.SetWeight(c.weights.TraceQLSearchWeight + 1) | ||
} | ||
} |
Oops, something went wrong.