Skip to content

Commit

Permalink
Fix regression in bloom gateway that caused nothing to be filtered
Browse files Browse the repository at this point in the history
Pull request #14661 added the series labels to the FilterChunkRefs
request in order to be able to filter out full series that do not
contain the filter key when using bloom filters.

However, this chage incorrectly assumed that `Chunk.Metric` from the
`GetChunks()` call contains the labels of the stream. This information
is only populated once the chunk is fetched, though. Therefore the list
of labels was empty, and no chunk refs were filtered.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Nov 6, 2024
1 parent b05b80c commit 67ad033
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions pkg/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func buildResponses(query seriesindex.Query, batch seriesindex.ReadBatchResult,

func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequest) (result *logproto.GetChunkRefResponse, err error) {
logger := util_log.WithContext(ctx, g.log)
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.GetChunkRef")

instanceID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -225,16 +226,12 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return nil, err
}

series := make(map[uint64]labels.Labels)
result = &logproto.GetChunkRefResponse{
Refs: make([]*logproto.ChunkRef, 0, len(chunks)),
}
for _, cs := range chunks {
for i := range cs {
result.Refs = append(result.Refs, &cs[i].ChunkRef)
if _, ok := series[cs[i].Fingerprint]; !ok {
series[cs[i].Fingerprint] = cs[i].Metric
}
}
}

Expand All @@ -261,10 +258,22 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ
return result, nil
}

chunkRefs, used, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, series, result.Refs, req.Plan)
// Doing a "duplicate" index lookup is not ideal,
// however, modifying the GetChunkRef() response, which contains the logproto.ChunkRef is neither.
start := time.Now()
series, err := g.indexQuerier.GetSeries(ctx, instanceID, req.From, req.Through, matchers...)
seriesMap := make(map[uint64]labels.Labels, len(series))
for _, s := range series {
seriesMap[s.Hash()] = s
}
sp.LogKV("msg", "indexQuerier.GetSeries", "duration", time.Since(start), "count", len(series))

start = time.Now()
chunkRefs, used, err := g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, seriesMap, result.Refs, req.Plan)
if err != nil {
return nil, err
}
sp.LogKV("msg", "bloomQuerier.FilterChunkRefs", "duration", time.Since(start))

result.Refs = chunkRefs
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", initialChunkCount, "filtered", len(result.Refs), "used_blooms", used)
Expand Down

0 comments on commit 67ad033

Please sign in to comment.