Skip to content

Commit

Permalink
Introduces cache to TSDB postings (#9621)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduces a new interface for fetching TSDB postings and implements a cached TSDB postings.
The new postings interface is named `PostingsReader` and contains two implementations: a simple one, which is identical to the former Postings behavior, and a new one, that caches postings using the existing `index_queries_cache_config`. By default, the simple/former implementation is used. The new cached implementation can be configured in the following way:
```yaml
storage_config:
  tsdb_shipper:
    cache_postings: true
```

In the future, we'll remove the `cache_postings` flag and change the behavior to cache postings by default.
  • Loading branch information
DylanGuedes authored Aug 3, 2023
1 parent 2fcde18 commit 1221658
Show file tree
Hide file tree
Showing 25 changed files with 1,339 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [9621](https://github.com/grafana/loki/pull/9621) **DylanGuedes**: Introduce TSDB postings cache.
* [10010](https://github.com/grafana/loki/pull/10010) **rasta-rocket**: feat(promtail): retrieve BotTags field from cloudflare
* [9995](https://github.com/grafana/loki/pull/9995) **chaudum**: Add jitter to the flush interval to prevent multiple ingesters to flush at the same time.
* [9797](https://github.com/grafana/loki/pull/9797) **chaudum**: Add new `loki_index_gateway_requests_total` counter metric to observe per-tenant RPS
Expand Down
8 changes: 8 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,9 @@ boltdb_shipper:
# CLI flag: -boltdb.shipper.build-per-tenant-index
[build_per_tenant_index: <boolean> | default = false]

# Configures storing index in an Object Store
# (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required
# fields only required when TSDB is defined in config.
tsdb_shipper:
# Directory where ingesters would write index files which would then be
# uploaded by shipper to configured storage
Expand Down Expand Up @@ -2037,6 +2040,11 @@ tsdb_shipper:
[mode: <string> | default = ""]

[ingesterdbretainperiod: <duration>]

# Experimental. Whether TSDB should cache postings or not. The
# index-read-cache will be used as the backend.
# CLI flag: -tsdb.enable-postings-cache
[enable_postings_cache: <boolean> | default = false]
```
### chunk_store_config
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ require (
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/oauth2 v0.10.0
golang.org/x/text v0.11.0
google.golang.org/protobuf v1.31.0
)

require (
Expand Down Expand Up @@ -310,7 +311,6 @@ require (
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
186 changes: 186 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package integration

import (
"context"
"strings"
"testing"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
Expand Down Expand Up @@ -567,3 +571,185 @@ func TestSchedulerRing(t *testing.T) {
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}

func TestQueryTSDB_WithCachedPostings(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDB)

defer func() {
assert.NoError(t, clu.Cleanup())
}()

var (
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
"-tsdb.enable-postings-cache=true",
"-store.index-cache-read.cache.enable-fifocache=true",
)
)
require.NoError(t, clu.Run())

var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

// initial cache state.
igwMetrics, err := cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
gets: 0,
misses: 0,
added: 0,
})

t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-72*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
})

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())

// Query lines
t.Run("query to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}

assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
})

igwMetrics, err = cliIndexGateway.Metrics()
require.NoError(t, err)
assertCacheState(t, igwMetrics, &expectedCacheState{
cacheName: "store.index-cache-read.embedded-cache",
gets: 50,
misses: 1,
added: 1,
})

// ingest logs with ts=now.
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))

// default length is 7 days.
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
// expect lines from both, ingesters memory and from the store.
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)

}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
continue
}

return m.Counter.GetValue()
}

return 0
}

func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics))
require.NoError(t, err)

lbs := []*dto.LabelPair{
{
Name: proto.String("cache"),
Value: proto.String(e.cacheName),
},
}

mf, found := mfs["querier_cache_added_new_total"]
require.True(t, found)
require.Equal(t, e.added, getValueFromMF(mf, lbs))

mf, found = mfs["querier_cache_gets_total"]
require.True(t, found)
require.Equal(t, e.gets, getValueFromMF(mf, lbs))

mf, found = mfs["querier_cache_misses_total"]
require.True(t, found)
require.Equal(t, e.misses, getValueFromMF(mf, lbs))
}

type expectedCacheState struct {
cacheName string
gets float64
misses float64
added float64
}
7 changes: 4 additions & 3 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/tsdb"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -283,9 +284,9 @@ type Config struct {
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"`

MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper"`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig shipper.Config `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
indexClientLogger := log.With(s.logger, "index-store", fmt.Sprintf("%s-%s", p.IndexType, p.From.String()))

if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig.Config) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger)
if err != nil {
Expand Down Expand Up @@ -272,7 +272,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
}

indexReaderWriter, stopTSDBStoreFunc, err := tsdb.NewStore(fmt.Sprintf("%s_%s", p.ObjectType, p.From.String()), s.cfg.TSDBShipperConfig, s.schemaCfg, f, objectClient, s.limits,
tableRange, backupIndexWriter, indexClientReg, indexClientLogger)
tableRange, backupIndexWriter, indexClientReg, indexClientLogger, s.indexReadCache)
if err != nil {
return nil, nil, nil, err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/tsdb"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -1005,7 +1006,7 @@ func TestStore_indexPrefixChange(t *testing.T) {

cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
TSDBShipperConfig: shipperConfig,
TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig},
NamedStores: NamedStores{
Filesystem: map[string]NamedFSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
Expand Down Expand Up @@ -1166,7 +1167,7 @@ func TestStore_MultiPeriod(t *testing.T) {
BoltDBShipperConfig: shipper.Config{
Config: shipperConfig,
},
TSDBShipperConfig: shipperConfig,
TSDBShipperConfig: tsdb.IndexCfg{Config: shipperConfig, CachePostings: false},
NamedStores: NamedStores{
Filesystem: map[string]NamedFSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
Expand Down Expand Up @@ -1479,7 +1480,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
BoltDBShipperConfig: boltdbShipperConfig,
TSDBShipperConfig: tsdbShipperConfig,
TSDBShipperConfig: tsdb.IndexCfg{Config: tsdbShipperConfig},
}

schemaConfig := config.SchemaConfig{
Expand Down
Loading

0 comments on commit 1221658

Please sign in to comment.