-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduces cache to TSDB postings #9621
Merged
Merged
Changes from 95 commits
Commits
Show all changes
96 commits
Select commit
Hold shift + click to select a range
bc286f8
hacky cached postings
DylanGuedes da40b40
Change signatures
DylanGuedes 2753a89
tmp inherit thanos indexcache code
DylanGuedes 64a0af7
make caching configurable
DylanGuedes e37da4d
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 369869f
Implement LRU as a possible cache option
DylanGuedes 6878e4b
Add tests
DylanGuedes b5be8fe
delete indexcache folder
DylanGuedes f795168
undo change
DylanGuedes e9d2650
trim down code
DylanGuedes 1ff0663
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 3b5ab8f
Use postingsclient.
DylanGuedes 317a879
Use right cache name.
DylanGuedes 66c7f0d
Rename flag and remove unused var.
DylanGuedes 71cac4e
Make the metrics more consistent.
DylanGuedes bf330fc
Pass canonical keys directly.
DylanGuedes 4a4d1fd
Pass ctx directly.
DylanGuedes 88fe691
Reuse ctx
DylanGuedes 604de2f
Append series numbers to encoded binary
DylanGuedes 99268de
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 4e784f3
Rename to postingsReader
DylanGuedes a7d48a8
lint fix
DylanGuedes bba1de5
Implement overflow logic
DylanGuedes 03d0cc8
Fix tests
DylanGuedes a98cfea
Finish fixing tests.
DylanGuedes 7f4e582
Fix lint
DylanGuedes 8c71ebf
Fix lint
DylanGuedes 64ad6ca
Rename client->reader
DylanGuedes e9ef21a
Update calls (haudi suggestions)
DylanGuedes 68361f3
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 84f68df
Fix tests
DylanGuedes 210ac07
Make sure cache is used on tests
DylanGuedes eef25e5
remvoe consts only used by mimir
DylanGuedes 9b902d1
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes d27286c
appease lint (remove TSDB from struct name)
DylanGuedes 1e6fd33
fix lint
DylanGuedes 2030224
fix import order
DylanGuedes 33521a1
fix import order again
DylanGuedes 52ca301
encode with 32b instead of 64b (tsdb uses 32b internally)
DylanGuedes 2adc870
Use "," to separate matchers.
DylanGuedes 423692b
better defaults
DylanGuedes 474261d
bugged linter
DylanGuedes 2baf30d
fix docs
DylanGuedes 110eac7
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 571c6a8
fix config type
DylanGuedes 4139abd
Register flags
DylanGuedes 3d30b1e
wrap error on vendor
DylanGuedes 7d9a24b
test other thing
DylanGuedes 7edc7bd
wrap errors
DylanGuedes 7d7c827
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 8c9fcef
careful wrapping
DylanGuedes 5521d82
wrap at different place
DylanGuedes 0ef38b7
wrap different
DylanGuedes dcdf6d2
use sharded postings?
DylanGuedes 6c36767
sanity check
DylanGuedes b84e257
reset postings by calling PostingsForMatcher again.
DylanGuedes 5ff3a36
sanity check
DylanGuedes d9354f5
Try calling PostingsForMatchers after cache hit too.
DylanGuedes fb9b541
another sanity check
DylanGuedes d3a2a5b
debug decoded/encoded series
DylanGuedes cd0d3a9
was my decoding wrong?
DylanGuedes f7d8013
cleanup
DylanGuedes fe659c8
cleanup cached postings file.
DylanGuedes f07228d
revert vendor changes
DylanGuedes 2f00fb7
Undo change to error messages
DylanGuedes 53162c8
Add flag docs
DylanGuedes 99a4c83
remove unnecessary test
DylanGuedes 8eb1308
Use the checksum as part of the key.
DylanGuedes c3736af
Add changelog entry.
DylanGuedes 8f556ed
Add functional test.
DylanGuedes dad71f9
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 18da973
Change "cache_postings" -> "enable_cache_postings"
DylanGuedes 942d450
Apply Haudi suggestion (see https://github.com/grafana/loki/pull/9621…
DylanGuedes fe6c834
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes a4979c8
update flag used by e2e test
DylanGuedes 3e0c31d
Refactor how the caching struct is passed
DylanGuedes fd8b411
fix lint.
DylanGuedes 871e3e4
Use background writes for LRU cache.
DylanGuedes 0c4e141
Add length=0 bypass.
DylanGuedes 67852d9
Change default max item size.
DylanGuedes 715273c
Update docs
DylanGuedes d2fc841
lint
DylanGuedes 468914a
Implements snappy postings decoding/encoding
DylanGuedes ab45f97
fix formatting
DylanGuedes 61e1bec
Remove LRU cache.
DylanGuedes ce02398
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes 2be199b
fix microservices test
DylanGuedes 07926d5
Rename enable-postings-cache flag.
DylanGuedes 6c7433b
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes c3e6e8b
Apply suggestions from code review
DylanGuedes ca33e61
fix test
DylanGuedes bfc54c4
add description docs for tsdbshipper
DylanGuedes 8ec02b6
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes b87c48d
Test caching behaviro on e2e test.
DylanGuedes ee07d76
update go.mod
DylanGuedes 6177240
change to sorteablelabelmatchers.
DylanGuedes File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,15 @@ package integration | |
|
||
import ( | ||
"context" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
||
dto "github.com/prometheus/client_model/go" | ||
"github.com/prometheus/common/expfmt" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/grafana/loki/integration/client" | ||
"github.com/grafana/loki/integration/cluster" | ||
|
@@ -567,3 +571,185 @@ func TestSchedulerRing(t *testing.T) { | |
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) | ||
}) | ||
} | ||
|
||
func TestQueryTSDB_WithCachedPostings(t *testing.T) { | ||
clu := cluster.New(nil, cluster.SchemaWithTSDB) | ||
|
||
defer func() { | ||
assert.NoError(t, clu.Cleanup()) | ||
}() | ||
|
||
var ( | ||
tDistributor = clu.AddComponent( | ||
"distributor", | ||
"-target=distributor", | ||
) | ||
tIndexGateway = clu.AddComponent( | ||
"index-gateway", | ||
"-target=index-gateway", | ||
"-tsdb.enable-postings-cache=true", | ||
"-store.index-cache-read.cache.enable-fifocache=true", | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
var ( | ||
tIngester = clu.AddComponent( | ||
"ingester", | ||
"-target=ingester", | ||
"-ingester.flush-on-shutdown=true", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
tQueryScheduler = clu.AddComponent( | ||
"query-scheduler", | ||
"-target=query-scheduler", | ||
"-query-scheduler.use-scheduler-ring=false", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
tCompactor = clu.AddComponent( | ||
"compactor", | ||
"-target=compactor", | ||
"-boltdb.shipper.compactor.compaction-interval=1s", | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
// finally, run the query-frontend and querier. | ||
var ( | ||
tQueryFrontend = clu.AddComponent( | ||
"query-frontend", | ||
"-target=query-frontend", | ||
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(), | ||
"-frontend.default-validity=0s", | ||
"-common.compactor-address="+tCompactor.HTTPURL(), | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
_ = clu.AddComponent( | ||
"querier", | ||
"-target=querier", | ||
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(), | ||
"-common.compactor-address="+tCompactor.HTTPURL(), | ||
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), | ||
) | ||
) | ||
require.NoError(t, clu.Run()) | ||
|
||
tenantID := randStringRunes() | ||
|
||
now := time.Now() | ||
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) | ||
cliDistributor.Now = now | ||
cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) | ||
cliIngester.Now = now | ||
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) | ||
cliQueryFrontend.Now = now | ||
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) | ||
cliIndexGateway.Now = now | ||
|
||
// initial cache state. | ||
igwMetrics, err := cliIndexGateway.Metrics() | ||
require.NoError(t, err) | ||
assertCacheState(t, igwMetrics, &expectedCacheState{ | ||
cacheName: "store.index-cache-read.embedded-cache", | ||
gets: 0, | ||
misses: 0, | ||
added: 0, | ||
}) | ||
|
||
t.Run("ingest-logs", func(t *testing.T) { | ||
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"})) | ||
}) | ||
|
||
// restart ingester which should flush the chunks and index | ||
require.NoError(t, tIngester.Restart()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could also hit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this part was just copied from the other tests |
||
|
||
// Query lines | ||
t.Run("query to verify logs being served from storage", func(t *testing.T) { | ||
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) | ||
require.NoError(t, err) | ||
assert.Equal(t, "streams", resp.Data.ResultType) | ||
|
||
var lines []string | ||
for _, stream := range resp.Data.Stream { | ||
for _, val := range stream.Values { | ||
lines = append(lines, val[1]) | ||
} | ||
} | ||
|
||
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines) | ||
}) | ||
|
||
igwMetrics, err = cliIndexGateway.Metrics() | ||
require.NoError(t, err) | ||
assertCacheState(t, igwMetrics, &expectedCacheState{ | ||
cacheName: "store.index-cache-read.embedded-cache", | ||
gets: 50, | ||
misses: 1, | ||
added: 1, | ||
}) | ||
|
||
// ingest logs with ts=now. | ||
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"})) | ||
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"})) | ||
|
||
// default length is 7 days. | ||
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) | ||
require.NoError(t, err) | ||
assert.Equal(t, "streams", resp.Data.ResultType) | ||
|
||
var lines []string | ||
for _, stream := range resp.Data.Stream { | ||
for _, val := range stream.Values { | ||
lines = append(lines, val[1]) | ||
} | ||
} | ||
// expect lines from both, ingesters memory and from the store. | ||
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) | ||
|
||
} | ||
|
||
func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { | ||
for _, m := range mf.Metric { | ||
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { | ||
continue | ||
} | ||
|
||
return m.Counter.GetValue() | ||
} | ||
|
||
return 0 | ||
} | ||
|
||
func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) { | ||
var parser expfmt.TextParser | ||
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) | ||
require.NoError(t, err) | ||
|
||
lbs := []*dto.LabelPair{ | ||
{ | ||
Name: proto.String("cache"), | ||
Value: proto.String(e.cacheName), | ||
}, | ||
} | ||
|
||
mf, found := mfs["querier_cache_added_new_total"] | ||
require.True(t, found) | ||
require.Equal(t, e.added, getValueFromMF(mf, lbs)) | ||
|
||
mf, found = mfs["querier_cache_gets_total"] | ||
require.True(t, found) | ||
require.Equal(t, e.gets, getValueFromMF(mf, lbs)) | ||
|
||
mf, found = mfs["querier_cache_misses_total"] | ||
require.True(t, found) | ||
require.Equal(t, e.misses, getValueFromMF(mf, lbs)) | ||
} | ||
|
||
type expectedCacheState struct { | ||
cacheName string | ||
gets float64 | ||
misses float64 | ||
added float64 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test passes with this flag set to
false
. is there any way to this test to fail when caching is not happening? where are the objects coming from (local filesystem?), could we change the data at rest to prove the data we're getting is from cache?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an assertion that checks the cache added/misses/gets metrics results. they guarantee we're covering the cache behavior using the FIFO cache.