Skip to content

Commit

Permalink
fix: Do not filter out chunks for store when From==Through and `Fro…
Browse files Browse the repository at this point in the history
…m==start` (#13117)

This PR fixes a bug where chunks are incorrectly filtered out when their `From` timestamp is equal to their `Through` timestamp (which happens when there is a single log line) and the `From` timestamp is equal to the `from` time of the of the request.

How to reproduce:

1. Insert a single log line with a timestamp exactly at point of an hour
2. Flush ingester
3. Query log line with a split interval of 1h

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Ed Welch <edward.welch@grafana.com>
(cherry picked from commit d9cc513)
  • Loading branch information
chaudum authored and grafana-delivery-bot[bot] committed Jun 4, 2024
1 parent 00d3c7a commit 67b0637
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (c CompositeStore) GetChunks(
) ([][]chunk.Chunk, []*fetcher.Fetcher, error) {
chunkIDs := [][]chunk.Chunk{}
fetchers := []*fetcher.Fetcher{}
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
ids, fetcher, err := store.GetChunks(innerCtx, userID, from, through, predicate, storeChunksOverride)
err := c.forStores(ctx, from, through, func(innerCtx context.Context, innerFrom, innerThrough model.Time, store Store) error {
ids, fetcher, err := store.GetChunks(innerCtx, userID, innerFrom, innerThrough, predicate, storeChunksOverride)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (c *storeEntry) GetChunks(
func filterForTimeRange(refs []*logproto.ChunkRef, from, through model.Time) []chunk.Chunk {
filtered := make([]chunk.Chunk, 0, len(refs))
for _, ref := range refs {
if through >= ref.From && from < ref.Through {
// Only include chunks where the query start time (from) is < the chunk end time (ref.Through)
// and the query end time (through) is >= the chunk start time (ref.From)
// A special case also exists where a chunk can contain a single log line which results in ref.From being equal to ref.Through, and that is equal to the from time.
if (through >= ref.From && from < ref.Through) || (ref.From == from && ref.Through == from) {
filtered = append(filtered, chunk.Chunk{
ChunkRef: *ref,
})
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,23 @@ func TestFilterForTimeRange(t *testing.T) {
through: 7,
exp: mkChks(5, 7),
},
{
desc: "ref with from == through",
input: []*logproto.ChunkRef{
{From: 1, Through: 1}, // outside
{From: 2, Through: 2}, // ref.From == from == ref.Through
{From: 3, Through: 3}, // inside
{From: 4, Through: 4}, // ref.From == through == ref.Through
{From: 5, Through: 5}, // outside
},
from: 2,
through: 4,
exp: []chunk.Chunk{
{ChunkRef: logproto.ChunkRef{From: 2, Through: 2}},
{ChunkRef: logproto.ChunkRef{From: 3, Through: 3}},
{ChunkRef: logproto.ChunkRef{From: 4, Through: 4}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
got := filterForTimeRange(tc.input, tc.from, tc.through)
Expand Down

0 comments on commit 67b0637

Please sign in to comment.