Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

per user index query readiness with limits overrides #5484

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* [5315](https://github.com/grafana/loki/pull/5315) **bboreham**: filters: use faster regexp package
* [5393](https://github.com/grafana/loki/pull/5393) **sandeepsukhani**: jsonnet: move boltdb-shipper configs set as compactor args to yaml config
* [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option
* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides

# 2.4.1 (2021/11/07)

Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return nil, err
}

shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, prometheus.DefaultRegisterer)
shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/storage/caching_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/gcp"
"github.com/grafana/loki/pkg/storage/chunk/testutils"
"github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/validation"
Copy link
Contributor Author

@sandeepsukhani sandeepsukhani Feb 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The github.com/grafana/loki/pkg/util/validation points to a copy of the cortex validation package, which we should clean up sometime.

)

type fixture struct {
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/openstack"
"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ type indexStoreFactories struct {
}

// IndexClientFactoryFunc defines signature of function which creates chunk.IndexClient for managing index in index store
type IndexClientFactoryFunc func() (chunk.IndexClient, error)
type IndexClientFactoryFunc func(limits StoreLimits) (chunk.IndexClient, error)

// TableClientFactoryFunc defines signature of function which creates chunk.TableClient for managing tables in index store
type TableClientFactoryFunc func() (chunk.TableClient, error)
Expand All @@ -72,6 +73,7 @@ func RegisterIndexStore(name string, indexClientFactory IndexClientFactoryFunc,

// StoreLimits helps get Limits specific to Queries for Stores
type StoreLimits interface {
downloads.Limits
CardinalityLimit(userID string) int
MaxChunksPerQueryFromStore(userID string) int
MaxQueryLength(userID string) time.Duration
Expand Down Expand Up @@ -212,7 +214,7 @@ func NewStore(
indexClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "index-store-" + s.From.String()}, reg)

index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, indexClientReg)
index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, limits, indexClientReg)
if err != nil {
return nil, errors.Wrap(err, "error creating index client")
}
Expand Down Expand Up @@ -243,10 +245,10 @@ func NewStore(
}

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) {
func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, limits StoreLimits, registerer prometheus.Registerer) (chunk.IndexClient, error) {
if indexClientFactory, ok := customIndexStores[name]; ok {
if indexClientFactory.indexClientFactoryFunc != nil {
return indexClientFactory.indexClientFactoryFunc()
return indexClientFactory.indexClientFactoryFunc(limits)
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/chunk/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cassandra"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/validation"
)

func TestFactoryStop(t *testing.T) {
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCustomIndexClient(t *testing.T) {
{
indexClientName: "boltdb",
indexClientFactories: indexStoreFactories{
indexClientFactoryFunc: func() (client chunk.IndexClient, e error) {
indexClientFactoryFunc: func(_ StoreLimits) (client chunk.IndexClient, e error) {
return newBoltDBCustomIndexClient(cfg.BoltDBConfig)
},
},
Expand All @@ -115,7 +115,7 @@ func TestCustomIndexClient(t *testing.T) {
{
indexClientName: "boltdb",
indexClientFactories: indexStoreFactories{
indexClientFactoryFunc: func() (client chunk.IndexClient, e error) {
indexClientFactoryFunc: func(_ StoreLimits) (client chunk.IndexClient, e error) {
return newBoltDBCustomIndexClient(cfg.BoltDBConfig)
},
tableClientFactoryFunc: func() (client chunk.TableClient, e error) {
Expand All @@ -134,7 +134,7 @@ func TestCustomIndexClient(t *testing.T) {
RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc)
}

indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil)
indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil, nil)
if tc.errorExpected {
require.Error(t, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func RegisterCustomIndexClients(cfg *Config, cm storage.ClientMetrics, registere
// in tests for creating multiple instances of it at a time.
var boltDBIndexClientWithShipper chunk.IndexClient

storage.RegisterIndexStore(shipper.BoltDBShipperType, func() (chunk.IndexClient, error) {
storage.RegisterIndexStore(shipper.BoltDBShipperType, func(limits storage.StoreLimits) (chunk.IndexClient, error) {
if boltDBIndexClientWithShipper != nil {
return boltDBIndexClientWithShipper, nil
}
Expand All @@ -445,7 +445,7 @@ func RegisterCustomIndexClients(cfg *Config, cm storage.ClientMetrics, registere
return nil, err
}

boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, registerer)
boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, registerer)

return boltDBIndexClientWithShipper, err
}, func() (client chunk.TableClient, e error) {
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/stores/shipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ func (t *indexSet) Init() (err error) {
t.dbsMtx.markReady()
}()

startTime := time.Now()

filesInfo, err := ioutil.ReadDir(t.cacheLocation)
if err != nil {
return err
Expand Down Expand Up @@ -148,9 +146,6 @@ func (t *indexSet) Init() (err error) {
return
}

duration := time.Since(startTime).Seconds()
t.metrics.tablesDownloadDurationSeconds.add(t.tableName, duration)

level.Debug(logger).Log("msg", "finished syncing files")

return
Expand Down
37 changes: 6 additions & 31 deletions pkg/storage/stores/shipper/downloads/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package downloads

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -12,42 +10,19 @@ const (
statusSuccess = "success"
)

type downloadTableDurationMetric struct {
sync.RWMutex
gauge prometheus.Gauge
periods map[string]float64
}

func (m *downloadTableDurationMetric) add(period string, downloadDuration float64) {
m.Lock()
defer m.Unlock()
m.periods[period] = downloadDuration

totalDuration := float64(0)
for _, dur := range m.periods {
totalDuration += dur
}

m.gauge.Set(totalDuration)
}

type metrics struct {
// metrics for measuring performance of downloading of files per period initially i.e for the first time
tablesDownloadDurationSeconds *downloadTableDurationMetric

queryTimeTableDownloadDurationSeconds *prometheus.CounterVec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if an histogram wouldn't be better here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that would add much value. Would we be interested in looking at a distribution? I think it would be rare and we can use quantile_over_time from the logline I have added here. wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the PromQL you'll use with this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like sum(quantile_over_time({container="index-gateway"} |= "downloaded index set at query time" | logfmt | unwrap duration(duration)[5m]) by (user_id). Makes sense?

tablesSyncOperationTotal *prometheus.CounterVec
tablesDownloadOperationDurationSeconds prometheus.Gauge
}

func newMetrics(r prometheus.Registerer) *metrics {
m := &metrics{
tablesDownloadDurationSeconds: &downloadTableDurationMetric{
periods: map[string]float64{},
gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "initial_tables_download_duration_seconds",
Help: "Time (in seconds) spent in downloading of files per table, initially i.e for the first time",
})},
queryTimeTableDownloadDurationSeconds: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "query_time_table_download_duration_seconds",
Help: "Time (in seconds) spent in downloading of files per table at query time",
}, []string{"table"}),
tablesSyncOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "tables_sync_operation_total",
Expand Down
67 changes: 37 additions & 30 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package downloads
import (
"context"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"sync"
Expand Down Expand Up @@ -32,17 +31,17 @@ type BoltDBIndexClient interface {
QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error
}

type StorageClient interface {
ListTables(ctx context.Context) ([]string, error)
ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, error)
GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error)
GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error)
IsFileNotFoundErr(err error) bool
type Table interface {
Close()
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error
DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error)
Sync(ctx context.Context) error
EnsureQueryReadiness(ctx context.Context, userIDs []string) error
}

// Table is a collection of multiple files created for a same table by various ingesters.
// table is a collection of multiple files created for a same table by various ingesters.
// All the public methods are concurrency safe and take care of mutexes to avoid any data race.
type Table struct {
type table struct {
name string
cacheLocation string
metrics *metrics
Expand All @@ -56,10 +55,10 @@ type Table struct {
indexSetsMtx sync.RWMutex
}

// NewTable just creates an instance of Table without trying to load files from local storage or object store.
// NewTable just creates an instance of table without trying to load files from local storage or object store.
// It is used for initializing table at query time.
func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) *Table {
table := Table{
func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) Table {
table := table{
name: name,
cacheLocation: cacheLocation,
metrics: metrics,
Expand All @@ -76,7 +75,7 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIn

// LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store.
// It is used for loading and initializing table at startup. It would initialize index sets which already had files locally.
func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) (*Table, error) {
func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) (Table, error) {
err := chunk_util.EnsureDirectory(cacheLocation)
if err != nil {
return nil, err
Expand All @@ -87,7 +86,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}

table := Table{
table := table{
name: name,
cacheLocation: cacheLocation,
metrics: metrics,
Expand Down Expand Up @@ -139,7 +138,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
}

// Close Closes references to all the dbs.
func (t *Table) Close() {
func (t *table) Close() {
t.indexSetsMtx.Lock()
defer t.indexSetsMtx.Unlock()

Expand All @@ -151,15 +150,15 @@ func (t *Table) Close() {
}

// MultiQueries runs multiple queries without having to take lock multiple times for each query.
func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error {
func (t *table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

// query both user and common index
for _, uid := range []string{userID, ""} {
indexSet, err := t.getOrCreateIndexSet(uid)
indexSet, err := t.getOrCreateIndexSet(uid, true)
if err != nil {
return err
}
Expand All @@ -186,7 +185,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
return nil
}

func (t *Table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string {
func (t *table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string {
t.indexSetsMtx.RLock()
defer t.indexSetsMtx.RUnlock()

Expand Down Expand Up @@ -216,7 +215,7 @@ func (t *Table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string

// DropUnusedIndex drops the index set if it has not been queried for at least ttl duration.
// It returns true if the whole table gets dropped.
func (t *Table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) {
func (t *table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) {
indexSetsToCleanup := t.findExpiredIndexSets(ttl, now)

if len(indexSetsToCleanup) > 0 {
Expand All @@ -239,7 +238,7 @@ func (t *Table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error)
}

// Sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *Table) Sync(ctx context.Context) error {
func (t *table) Sync(ctx context.Context) error {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.name))

t.indexSetsMtx.RLock()
Expand All @@ -257,7 +256,9 @@ func (t *Table) Sync(ctx context.Context) error {
// getOrCreateIndexSet gets or creates the index set for the userID.
// If it does not exist, it creates a new one and initializes it in a goroutine.
// Caller can use IndexSet.AwaitReady() to wait until the IndexSet gets ready, if required.
func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) {
// forQuerying must be set to true only getting the index for querying since
// it captures the amount of time it takes to download the index at query time.
func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, error) {
t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[id]
t.indexSetsMtx.RUnlock()
Expand Down Expand Up @@ -289,6 +290,15 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) {

// initialize the index set in async mode, it would be upto the caller to wait for its readiness using IndexSet.AwaitReady()
go func() {
if forQuerying {
start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration)
level.Info(loggerWithUserID(t.logger, id)).Log("msg", "downloaded index set at query time", "duration", duration)
}()
}

err := indexSet.Init()
if err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err)
Expand All @@ -298,13 +308,10 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) {
return indexSet, nil
}

func (t *Table) EnsureQueryReadiness(ctx context.Context) error {
_, userIDs, err := t.storageClient.ListFiles(ctx, t.name)
if err != nil {
return err
}

commonIndexSet, err := t.getOrCreateIndexSet("")
// EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs.
// When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids.
func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error {
commonIndexSet, err := t.getOrCreateIndexSet("", false)
if err != nil {
return err
}
Expand All @@ -330,9 +337,9 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error {
}

// downloadUserIndexes downloads user specific index files concurrently.
func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error {
func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error {
return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error {
indexSet, err := t.getOrCreateIndexSet(userIDs[idx])
indexSet, err := t.getOrCreateIndexSet(userIDs[idx], false)
if err != nil {
return err
}
Expand Down
Loading