-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduces cache to TSDB postings #9621
Changes from 93 commits
bc286f8
da40b40
2753a89
64a0af7
e37da4d
369869f
6878e4b
b5be8fe
f795168
e9d2650
1ff0663
3b5ab8f
317a879
66c7f0d
71cac4e
bf330fc
4a4d1fd
88fe691
604de2f
99268de
4e784f3
a7d48a8
bba1de5
03d0cc8
a98cfea
7f4e582
8c71ebf
64ad6ca
e9ef21a
68361f3
84f68df
210ac07
eef25e5
9b902d1
d27286c
1e6fd33
2030224
33521a1
52ca301
2adc870
423692b
474261d
2baf30d
110eac7
571c6a8
4139abd
3d30b1e
7d9a24b
7edc7bd
7d7c827
8c9fcef
5521d82
0ef38b7
dcdf6d2
6c36767
b84e257
5ff3a36
d9354f5
fb9b541
d3a2a5b
cd0d3a9
f7d8013
fe659c8
f07228d
2f00fb7
53162c8
99a4c83
8eb1308
c3736af
8f556ed
dad71f9
18da973
942d450
fe6c834
a4979c8
3e0c31d
fd8b411
871e3e4
0c4e141
67852d9
715273c
d2fc841
468914a
ab45f97
61e1bec
ce02398
2be199b
07926d5
6c7433b
c3e6e8b
ca33e61
bfc54c4
8ec02b6
b87c48d
ee07d76
6177240
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -567,3 +567,120 @@ func TestSchedulerRing(t *testing.T) { | |
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) | ||
}) | ||
} | ||
|
||
func TestQueryTSDB_WithCachedPostings(t *testing.T) { | ||
clu := cluster.New(nil, cluster.SchemaWithTSDB) | ||
|
||
defer func() { | ||
assert.NoError(t, clu.Cleanup()) | ||
}() | ||
|
||
var ( | ||
tDistributor = clu.AddComponent( | ||
"distributor", | ||
"-target=distributor", | ||
) | ||
tIndexGateway = clu.AddComponent( | ||
"index-gateway", | ||
"-target=index-gateway", | ||
"-tsdb.enable-postings-cache=true", | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
var ( | ||
tIngester = clu.AddComponent( | ||
"ingester", | ||
"-target=ingester", | ||
"-ingester.flush-on-shutdown=true", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
tQueryScheduler = clu.AddComponent( | ||
"query-scheduler", | ||
"-target=query-scheduler", | ||
"-query-scheduler.use-scheduler-ring=false", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
tCompactor = clu.AddComponent( | ||
"compactor", | ||
"-target=compactor", | ||
"-boltdb.shipper.compactor.compaction-interval=1s", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
// finally, run the query-frontend and querier. | ||
var ( | ||
tQueryFrontend = clu.AddComponent( | ||
"query-frontend", | ||
"-target=query-frontend", | ||
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(), | ||
"-frontend.default-validity=0s", | ||
"-common.compactor-address="+tCompactor.HTTPURL(), | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
_ = clu.AddComponent( | ||
"querier", | ||
"-target=querier", | ||
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(), | ||
"-common.compactor-address="+tCompactor.HTTPURL(), | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
tenantID := randStringRunes() | ||
|
||
now := time.Now() | ||
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) | ||
cliDistributor.Now = now | ||
cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) | ||
cliIngester.Now = now | ||
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) | ||
cliQueryFrontend.Now = now | ||
|
||
// end of setup. Ingestion and querying below. | ||
|
||
t.Run("ingest-logs", func(t *testing.T) { | ||
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"})) | ||
}) | ||
|
||
// restart ingester which should flush the chunks and index | ||
require.NoError(t, tIngester.Restart()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could also hit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this part was just copied from the other tests |
||
|
||
// Query lines | ||
t.Run("query to verify logs being served from storage", func(t *testing.T) { | ||
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) | ||
require.NoError(t, err) | ||
assert.Equal(t, "streams", resp.Data.ResultType) | ||
|
||
var lines []string | ||
for _, stream := range resp.Data.Stream { | ||
for _, val := range stream.Values { | ||
lines = append(lines, val[1]) | ||
} | ||
} | ||
|
||
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines) | ||
}) | ||
|
||
// ingest logs with ts=now. | ||
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"})) | ||
|
||
// default length is 7 days. | ||
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) | ||
require.NoError(t, err) | ||
assert.Equal(t, "streams", resp.Data.ResultType) | ||
|
||
var lines []string | ||
for _, stream := range resp.Data.Stream { | ||
for _, val := range stream.Values { | ||
lines = append(lines, val[1]) | ||
} | ||
} | ||
// expect lines from both, ingesters memory and from the store. | ||
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
// SPDX-License-Identifier: AGPL-3.0-only | ||
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/postings_codec.go | ||
// Provenance-includes-license: Apache-2.0 | ||
// Provenance-includes-copyright: The Thanos Authors. | ||
|
||
package tsdb | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sort" | ||
"strings" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/storage" | ||
|
||
"github.com/grafana/loki/pkg/storage/chunk/cache" | ||
"github.com/grafana/loki/pkg/storage/stores/tsdb/index" | ||
) | ||
|
||
type PostingsReader interface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think I suggested this name in one of my previous reviews, but I don't think it really fits. It's not an interface for reading the postings, but rather for applying a function over the read postings. Maybe you can find a more suitable name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm I think Iterator is misleading; it doesn't support the Iterator interface/pattern (.Next(), .At(), etc) |
||
ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error | ||
} | ||
|
||
// NewCachedPostingsReader uses the cache defined by `index_read_cache` to store and read Postings. | ||
// | ||
// The cache key is stored/read as `matchers:reader_checksum`. | ||
// | ||
// The cache value is stored as: `[n, refs...]`, where n is how many series references this entry has, and refs is | ||
// a sequence of series references encoded as the diff between the current series and the previous one. | ||
// | ||
// Example: if the postings for stream "app=kubernetes,env=production" is `[1,7,30,50]` and its reader has `checksum=12345`: | ||
// - The cache key for the entry will be: `app=kubernetes,env=production:12345` | ||
// - The cache value for the entry will be: [4, 1, 6, 23, 20]. | ||
func NewCachedPostingsReader(reader IndexReader, logger log.Logger, cacheClient cache.Cache) PostingsReader { | ||
return &cachedPostingsReader{ | ||
reader: reader, | ||
cacheClient: cacheClient, | ||
log: logger, | ||
} | ||
} | ||
|
||
type cachedPostingsReader struct { | ||
reader IndexReader | ||
|
||
cacheClient cache.Cache | ||
|
||
log log.Logger | ||
} | ||
|
||
func (c *cachedPostingsReader) ForPostings(ctx context.Context, matchers []*labels.Matcher, fn func(index.Postings) error) error { | ||
checksum := c.reader.Checksum() | ||
key := fmt.Sprintf("%s:%d", CanonicalLabelMatchersKey(matchers), checksum) | ||
if postings, got := c.fetchPostings(ctx, key); got { | ||
return fn(postings) | ||
} | ||
|
||
p, err := PostingsForMatchers(c.reader, nil, matchers...) | ||
if err != nil { | ||
return fmt.Errorf("failed to evaluate postings for matchers: %w", err) | ||
} | ||
|
||
expandedPosts, err := index.ExpandPostings(p) | ||
if err != nil { | ||
return fmt.Errorf("failed to expand postings: %w", err) | ||
} | ||
|
||
if err := c.storePostings(ctx, expandedPosts, key); err != nil { | ||
DylanGuedes marked this conversation as resolved.
Show resolved
Hide resolved
|
||
level.Error(c.log).Log("msg", "failed to cache postings", "err", err, "matchers", key) | ||
} | ||
|
||
// because `index.ExpandPostings` walks the iterator, we have to reset it current index by instantiating a new ListPostings. | ||
return fn(index.NewListPostings(expandedPosts)) | ||
} | ||
|
||
func (c *cachedPostingsReader) storePostings(ctx context.Context, expandedPostings []storage.SeriesRef, canonicalMatchers string) error { | ||
buf, err := diffVarintSnappyEncode(index.NewListPostings(expandedPostings), len(expandedPostings)) | ||
if err != nil { | ||
return fmt.Errorf("couldn't encode postings: %w", err) | ||
} | ||
|
||
return c.cacheClient.Store(ctx, []string{canonicalMatchers}, [][]byte{buf}) | ||
} | ||
|
||
func (c *cachedPostingsReader) fetchPostings(ctx context.Context, key string) (index.Postings, bool) { | ||
found, bufs, _, err := c.cacheClient.Fetch(ctx, []string{key}) | ||
|
||
if err != nil { | ||
level.Error(c.log).Log("msg", "error on fetching postings", "err", err, "matchers", key) | ||
return nil, false | ||
} | ||
|
||
if len(found) > 0 { | ||
// we only use a single key so we only care about index=0. | ||
p, err := decodeToPostings(bufs[0]) | ||
if err != nil { | ||
level.Error(c.log).Log("msg", "failed to fetch postings", "err", err) | ||
return nil, false | ||
} | ||
return p, true | ||
} | ||
|
||
return nil, false | ||
} | ||
|
||
func decodeToPostings(b []byte) (index.Postings, error) { | ||
p, err := diffVarintSnappyDecode(b) | ||
if err != nil { | ||
return nil, fmt.Errorf("couldn't decode postings: %w", err) | ||
} | ||
|
||
return p, nil | ||
} | ||
|
||
// CanonicalLabelMatchersKey creates a canonical version of LabelMatchersKey | ||
func CanonicalLabelMatchersKey(ms []*labels.Matcher) string { | ||
sorted := make([]labels.Matcher, len(ms)) | ||
for i := range ms { | ||
sorted[i] = labels.Matcher{Type: ms[i].Type, Name: ms[i].Name, Value: ms[i].Value} | ||
} | ||
sort.Sort(sortedLabelMatchers(sorted)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it's a little weird we're sorting something called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
const ( | ||
typeLen = 2 | ||
sepLen = 1 | ||
) | ||
var size int | ||
for _, m := range sorted { | ||
size += len(m.Name) + len(m.Value) + typeLen + sepLen | ||
} | ||
sb := strings.Builder{} | ||
sb.Grow(size) | ||
for _, m := range sorted { | ||
sb.WriteString(m.Name) | ||
sb.WriteString(m.Type.String()) | ||
sb.WriteString(m.Value) | ||
sb.WriteByte(',') | ||
} | ||
return sb.String() | ||
} | ||
|
||
type sortedLabelMatchers []labels.Matcher | ||
|
||
func (c sortedLabelMatchers) Less(i, j int) bool { | ||
if c[i].Name != c[j].Name { | ||
return c[i].Name < c[j].Name | ||
} | ||
if c[i].Type != c[j].Type { | ||
return c[i].Type < c[j].Type | ||
} | ||
return c[i].Value < c[j].Value | ||
} | ||
|
||
func (c sortedLabelMatchers) Len() int { return len(c) } | ||
func (c sortedLabelMatchers) Swap(i, j int) { c[i], c[j] = c[j], c[i] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test passes with this flag set to
false
. is there any way to this test to fail when caching is not happening? where are the objects coming from (local filesystem?), could we change the data at rest to prove the data we're getting is from cache?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an assertion that checks the cache added/misses/gets metrics results. they guarantee we're covering the cache behavior using the FIFO cache.