Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixes pattern pruning stability #13429

Merged
merged 2 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 72 additions & 44 deletions pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,27 @@ import (
"errors"
"math"
"net/http"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"

loki_iter "github.com/grafana/loki/v3/pkg/iter"
pattern_iter "github.com/grafana/loki/v3/pkg/pattern/iter"
)

// TODO(kolesnikovae): parametrise QueryPatternsRequest
const minClusterSize = 30
const (
minClusterSize = 30
maxPatterns = 300
)

var ErrParseQuery = errors.New("only byte_over_time and count_over_time queries without filters are supported")

Expand Down Expand Up @@ -132,36 +136,63 @@ func (q *IngesterQuerier) querySample(ctx context.Context, req *logproto.QuerySa
return iterators, nil
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
pruneConfig := drain.DefaultConfig()
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse {
patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, "", nil)
for _, p := range resp.Series {
d.TrainPattern(p.GetPattern(), p.Samples)
total := make([]int64, len(resp.Series))

for i, p := range resp.Series {
for _, s := range p.Samples {
total[i] += s.Value
}
}

resp.Series = resp.Series[:0]
for _, cluster := range d.Clusters() {
if cluster.Size < minClusterSize {
continue
// Create a slice of structs to keep Series and total together
type SeriesWithTotal struct {
Series *logproto.PatternSeries
Total int64
}

seriesWithTotals := make([]SeriesWithTotal, len(resp.Series))
for i := range resp.Series {
seriesWithTotals[i] = SeriesWithTotal{
Series: resp.Series[i],
Total: total[i],
}
pattern := d.PatternString(cluster)
if pattern == "" {
continue
}

// Sort the slice of structs by the Total field
sort.Slice(seriesWithTotals, func(i, j int) bool {
return seriesWithTotals[i].Total > seriesWithTotals[j].Total
})

// Initialize a variable to keep track of the position for valid series
pos := 0

// Iterate over the seriesWithTotals
for i := range seriesWithTotals {
if seriesWithTotals[i].Total >= minClusterSize {
// Place the valid series at the current position
resp.Series[pos] = seriesWithTotals[i].Series
pos++
}
resp.Series = append(resp.Series,
logproto.NewPatternSeries(pattern, cluster.Samples()))
}

// Slice the resp.Series to include only the valid series
resp.Series = resp.Series[:pos]

if len(resp.Series) > maxPatterns {
resp.Series = resp.Series[:maxPatterns]
}

metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series)))
metrics.patternsRetainedTotal.Add(float64(len(resp.Series)))

return resp
}

// ForAllIngesters runs f, in parallel, for all ingesters
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read)
replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read)
if err != nil {
return nil, err
}
Expand All @@ -174,32 +205,29 @@ type ResponseFromIngesters struct {
response interface{}
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return ResponseFromIngesters{addr: ingester.Addr}, err
}

return ResponseFromIngesters{ingester.Addr, resp}, nil
}, func(ResponseFromIngesters) {
// Nothing to do
})
if err != nil {
g, ctx := errgroup.WithContext(ctx)
responses := make([]ResponseFromIngesters, len(replicationSet.Instances))

for i, ingester := range replicationSet.Instances {
ingester := ingester
i := i
g.Go(func() error {
client, err := q.ringClient.Pool().GetClientFor(ingester.Addr)
if err != nil {
return err
}

resp, err := f(ctx, client.(logproto.PatternClient))
if err != nil {
return err
}
responses[i] = ResponseFromIngesters{addr: ingester.Addr, response: resp}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}

responses := make([]ResponseFromIngesters, 0, len(results))
responses = append(responses, results...)

return responses, err
return responses, nil
}
Loading
Loading