diff --git a/CHANGELOG.md b/CHANGELOG.md index e9e6155f3bfdb..ef622bc5af444 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ * [5315](https://github.com/grafana/loki/pull/5315) **bboreham**: filters: use faster regexp package * [5393](https://github.com/grafana/loki/pull/5393) **sandeepsukhani**: jsonnet: move boltdb-shipper configs set as compactor args to yaml config * [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option +* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides # 2.4.1 (2021/11/07) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1891f12f28f08..009fe58967995 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -727,7 +727,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { return nil, err } - shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, prometheus.DefaultRegisterer) + shipperIndexClient, err := shipper.NewShipper(t.Cfg.StorageConfig.BoltDBShipperConfig, objectClient, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/storage/chunk/storage/caching_fixtures.go b/pkg/storage/chunk/storage/caching_fixtures.go index 4c5320f264567..0f596fc61e0ff 100644 --- a/pkg/storage/chunk/storage/caching_fixtures.go +++ b/pkg/storage/chunk/storage/caching_fixtures.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/gcp" "github.com/grafana/loki/pkg/storage/chunk/testutils" - "github.com/grafana/loki/pkg/util/validation" + "github.com/grafana/loki/pkg/validation" ) type fixture struct { diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index fa977f6231ede..b016d1118a2da 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/openstack" + "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -57,7 +58,7 @@ type indexStoreFactories struct { } // IndexClientFactoryFunc defines signature of function which creates chunk.IndexClient for managing index in index store -type IndexClientFactoryFunc func() (chunk.IndexClient, error) +type IndexClientFactoryFunc func(limits StoreLimits) (chunk.IndexClient, error) // TableClientFactoryFunc defines signature of function which creates chunk.TableClient for managing tables in index store type TableClientFactoryFunc func() (chunk.TableClient, error) @@ -72,6 +73,7 @@ func RegisterIndexStore(name string, indexClientFactory IndexClientFactoryFunc, // StoreLimits helps get Limits specific to Queries for Stores type StoreLimits interface { + downloads.Limits CardinalityLimit(userID string) int MaxChunksPerQueryFromStore(userID string) int MaxQueryLength(userID string) time.Duration @@ -212,7 +214,7 @@ func NewStore( indexClientReg := prometheus.WrapRegistererWith( prometheus.Labels{"component": "index-store-" + s.From.String()}, reg) - index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, indexClientReg) + index, err := NewIndexClient(s.IndexType, cfg, schemaCfg, limits, indexClientReg) if err != nil { return nil, errors.Wrap(err, "error creating index client") } @@ -243,10 +245,10 @@ func NewStore( } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, registerer prometheus.Registerer) (chunk.IndexClient, error) { +func NewIndexClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, limits StoreLimits, registerer prometheus.Registerer) (chunk.IndexClient, error) { if indexClientFactory, ok := customIndexStores[name]; ok { if indexClientFactory.indexClientFactoryFunc != nil { - return indexClientFactory.indexClientFactoryFunc() + return indexClientFactory.indexClientFactoryFunc(limits) } } diff --git a/pkg/storage/chunk/storage/factory_test.go b/pkg/storage/chunk/storage/factory_test.go index fca73f8a42d01..a3c4cf9e5a5da 100644 --- a/pkg/storage/chunk/storage/factory_test.go +++ b/pkg/storage/chunk/storage/factory_test.go @@ -14,7 +14,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/cassandra" "github.com/grafana/loki/pkg/storage/chunk/local" - "github.com/grafana/loki/pkg/util/validation" + "github.com/grafana/loki/pkg/validation" ) func TestFactoryStop(t *testing.T) { @@ -95,7 +95,7 @@ func TestCustomIndexClient(t *testing.T) { { indexClientName: "boltdb", indexClientFactories: indexStoreFactories{ - indexClientFactoryFunc: func() (client chunk.IndexClient, e error) { + indexClientFactoryFunc: func(_ StoreLimits) (client chunk.IndexClient, e error) { return newBoltDBCustomIndexClient(cfg.BoltDBConfig) }, }, @@ -115,7 +115,7 @@ func TestCustomIndexClient(t *testing.T) { { indexClientName: "boltdb", indexClientFactories: indexStoreFactories{ - indexClientFactoryFunc: func() (client chunk.IndexClient, e error) { + indexClientFactoryFunc: func(_ StoreLimits) (client chunk.IndexClient, e error) { return newBoltDBCustomIndexClient(cfg.BoltDBConfig) }, tableClientFactoryFunc: func() (client chunk.TableClient, e error) { @@ -134,7 +134,7 @@ func TestCustomIndexClient(t *testing.T) { RegisterIndexStore(tc.indexClientName, tc.indexClientFactories.indexClientFactoryFunc, tc.indexClientFactories.tableClientFactoryFunc) } - indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil) + indexClient, err := NewIndexClient(tc.indexClientName, cfg, schemaCfg, nil, nil) if tc.errorExpected { require.Error(t, err) } else { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 1f5ec9fac7594..8145de1c00c49 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -425,7 +425,7 @@ func RegisterCustomIndexClients(cfg *Config, cm storage.ClientMetrics, registere // in tests for creating multiple instances of it at a time. var boltDBIndexClientWithShipper chunk.IndexClient - storage.RegisterIndexStore(shipper.BoltDBShipperType, func() (chunk.IndexClient, error) { + storage.RegisterIndexStore(shipper.BoltDBShipperType, func(limits storage.StoreLimits) (chunk.IndexClient, error) { if boltDBIndexClientWithShipper != nil { return boltDBIndexClientWithShipper, nil } @@ -445,7 +445,7 @@ func RegisterCustomIndexClients(cfg *Config, cm storage.ClientMetrics, registere return nil, err } - boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, registerer) + boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, registerer) return boltDBIndexClientWithShipper, err }, func() (client chunk.TableClient, e error) { diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index f7be41b6099b1..0c28985e4889f 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -110,8 +110,6 @@ func (t *indexSet) Init() (err error) { t.dbsMtx.markReady() }() - startTime := time.Now() - filesInfo, err := ioutil.ReadDir(t.cacheLocation) if err != nil { return err @@ -148,9 +146,6 @@ func (t *indexSet) Init() (err error) { return } - duration := time.Since(startTime).Seconds() - t.metrics.tablesDownloadDurationSeconds.add(t.tableName, duration) - level.Debug(logger).Log("msg", "finished syncing files") return diff --git a/pkg/storage/stores/shipper/downloads/metrics.go b/pkg/storage/stores/shipper/downloads/metrics.go index 22b4eac7a10b7..aff0c93bc5be8 100644 --- a/pkg/storage/stores/shipper/downloads/metrics.go +++ b/pkg/storage/stores/shipper/downloads/metrics.go @@ -1,8 +1,6 @@ package downloads import ( - "sync" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -12,42 +10,19 @@ const ( statusSuccess = "success" ) -type downloadTableDurationMetric struct { - sync.RWMutex - gauge prometheus.Gauge - periods map[string]float64 -} - -func (m *downloadTableDurationMetric) add(period string, downloadDuration float64) { - m.Lock() - defer m.Unlock() - m.periods[period] = downloadDuration - - totalDuration := float64(0) - for _, dur := range m.periods { - totalDuration += dur - } - - m.gauge.Set(totalDuration) -} - type metrics struct { - // metrics for measuring performance of downloading of files per period initially i.e for the first time - tablesDownloadDurationSeconds *downloadTableDurationMetric - + queryTimeTableDownloadDurationSeconds *prometheus.CounterVec tablesSyncOperationTotal *prometheus.CounterVec tablesDownloadOperationDurationSeconds prometheus.Gauge } func newMetrics(r prometheus.Registerer) *metrics { m := &metrics{ - tablesDownloadDurationSeconds: &downloadTableDurationMetric{ - periods: map[string]float64{}, - gauge: promauto.With(r).NewGauge(prometheus.GaugeOpts{ - Namespace: "loki_boltdb_shipper", - Name: "initial_tables_download_duration_seconds", - Help: "Time (in seconds) spent in downloading of files per table, initially i.e for the first time", - })}, + queryTimeTableDownloadDurationSeconds: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_boltdb_shipper", + Name: "query_time_table_download_duration_seconds", + Help: "Time (in seconds) spent in downloading of files per table at query time", + }, []string{"table"}), tablesSyncOperationTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", Name: "tables_sync_operation_total", diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 7c53ce0367681..02f07a6cee775 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -3,7 +3,6 @@ package downloads import ( "context" "fmt" - "io" "io/ioutil" "path/filepath" "sync" @@ -32,17 +31,17 @@ type BoltDBIndexClient interface { QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback chunk.QueryPagesCallback) error } -type StorageClient interface { - ListTables(ctx context.Context) ([]string, error) - ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, error) - GetFile(ctx context.Context, tableName, fileName string) (io.ReadCloser, error) - GetUserFile(ctx context.Context, tableName, userID, fileName string) (io.ReadCloser, error) - IsFileNotFoundErr(err error) bool +type Table interface { + Close() + MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error + DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) + Sync(ctx context.Context) error + EnsureQueryReadiness(ctx context.Context, userIDs []string) error } -// Table is a collection of multiple files created for a same table by various ingesters. +// table is a collection of multiple files created for a same table by various ingesters. // All the public methods are concurrency safe and take care of mutexes to avoid any data race. -type Table struct { +type table struct { name string cacheLocation string metrics *metrics @@ -56,10 +55,10 @@ type Table struct { indexSetsMtx sync.RWMutex } -// NewTable just creates an instance of Table without trying to load files from local storage or object store. +// NewTable just creates an instance of table without trying to load files from local storage or object store. // It is used for initializing table at query time. -func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) *Table { - table := Table{ +func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) Table { + table := table{ name: name, cacheLocation: cacheLocation, metrics: metrics, @@ -76,7 +75,7 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, boltDBIn // LoadTable loads a table from local storage(syncs the table too if we have it locally) or downloads it from the shared store. // It is used for loading and initializing table at startup. It would initialize index sets which already had files locally. -func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) (*Table, error) { +func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBIndexClient BoltDBIndexClient, metrics *metrics) (Table, error) { err := chunk_util.EnsureDirectory(cacheLocation) if err != nil { return nil, err @@ -87,7 +86,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI return nil, err } - table := Table{ + table := table{ name: name, cacheLocation: cacheLocation, metrics: metrics, @@ -139,7 +138,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI } // Close Closes references to all the dbs. -func (t *Table) Close() { +func (t *table) Close() { t.indexSetsMtx.Lock() defer t.indexSetsMtx.Unlock() @@ -151,7 +150,7 @@ func (t *Table) Close() { } // MultiQueries runs multiple queries without having to take lock multiple times for each query. -func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { +func (t *table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { userID, err := tenant.TenantID(ctx) if err != nil { return err @@ -159,7 +158,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca // query both user and common index for _, uid := range []string{userID, ""} { - indexSet, err := t.getOrCreateIndexSet(uid) + indexSet, err := t.getOrCreateIndexSet(uid, true) if err != nil { return err } @@ -186,7 +185,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca return nil } -func (t *Table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string { +func (t *table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string { t.indexSetsMtx.RLock() defer t.indexSetsMtx.RUnlock() @@ -216,7 +215,7 @@ func (t *Table) findExpiredIndexSets(ttl time.Duration, now time.Time) []string // DropUnusedIndex drops the index set if it has not been queried for at least ttl duration. // It returns true if the whole table gets dropped. -func (t *Table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) { +func (t *table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) { indexSetsToCleanup := t.findExpiredIndexSets(ttl, now) if len(indexSetsToCleanup) > 0 { @@ -239,7 +238,7 @@ func (t *Table) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) } // Sync downloads updated and new files from the storage relevant for the table and removes the deleted ones -func (t *Table) Sync(ctx context.Context) error { +func (t *table) Sync(ctx context.Context) error { level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.name)) t.indexSetsMtx.RLock() @@ -257,7 +256,9 @@ func (t *Table) Sync(ctx context.Context) error { // getOrCreateIndexSet gets or creates the index set for the userID. // If it does not exist, it creates a new one and initializes it in a goroutine. // Caller can use IndexSet.AwaitReady() to wait until the IndexSet gets ready, if required. -func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) { +// forQuerying must be set to true only getting the index for querying since +// it captures the amount of time it takes to download the index at query time. +func (t *table) getOrCreateIndexSet(id string, forQuerying bool) (IndexSet, error) { t.indexSetsMtx.RLock() indexSet, ok := t.indexSets[id] t.indexSetsMtx.RUnlock() @@ -289,6 +290,15 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) { // initialize the index set in async mode, it would be upto the caller to wait for its readiness using IndexSet.AwaitReady() go func() { + if forQuerying { + start := time.Now() + defer func() { + duration := time.Since(start).Seconds() + t.metrics.queryTimeTableDownloadDurationSeconds.WithLabelValues(t.name).Add(duration) + level.Info(loggerWithUserID(t.logger, id)).Log("msg", "downloaded index set at query time", "duration", duration) + }() + } + err := indexSet.Init() if err != nil { level.Error(t.logger).Log("msg", fmt.Sprintf("failed to init user index set %s", id), "err", err) @@ -298,13 +308,10 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) { return indexSet, nil } -func (t *Table) EnsureQueryReadiness(ctx context.Context) error { - _, userIDs, err := t.storageClient.ListFiles(ctx, t.name) - if err != nil { - return err - } - - commonIndexSet, err := t.getOrCreateIndexSet("") +// EnsureQueryReadiness ensures that we have downloaded the common index as well as user index for the provided userIDs. +// When ensuring query readiness for a table, we will always download common index set because it can include index for one of the provided user ids. +func (t *table) EnsureQueryReadiness(ctx context.Context, userIDs []string) error { + commonIndexSet, err := t.getOrCreateIndexSet("", false) if err != nil { return err } @@ -330,9 +337,9 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error { } // downloadUserIndexes downloads user specific index files concurrently. -func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error { +func (t *table) downloadUserIndexes(ctx context.Context, userIDs []string) error { return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error { - indexSet, err := t.getOrCreateIndexSet(userIDs[idx]) + indexSet, err := t.getOrCreateIndexSet(userIDs[idx], false) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index 90dbe52fbf830..4581f61b952f2 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/validation" ) const ( @@ -25,11 +26,18 @@ const ( durationDay = 24 * time.Hour ) +type Limits interface { + AllByUserID() map[string]*validation.Limits + DefaultLimits() *validation.Limits + QueryReadyIndexNumDays(userID string) int +} + type Config struct { CacheDir string SyncInterval time.Duration CacheTTL time.Duration QueryReadyNumDays int + Limits Limits } type TableManager struct { @@ -37,7 +45,7 @@ type TableManager struct { boltIndexClient BoltDBIndexClient indexStorageClient storage.Client - tables map[string]*Table + tables map[string]Table tablesMtx sync.RWMutex metrics *metrics @@ -56,7 +64,7 @@ func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorage cfg: cfg, boltIndexClient: boltIndexClient, indexStorageClient: indexStorageClient, - tables: make(map[string]*Table), + tables: make(map[string]Table), metrics: newMetrics(registerer), ctx: ctx, cancel: cancel, @@ -71,7 +79,7 @@ func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorage } // download the missing tables. - err = tm.ensureQueryReadiness() + err = tm.ensureQueryReadiness(ctx) if err != nil { // call Stop to close open file references. tm.Stop() @@ -101,7 +109,7 @@ func (tm *TableManager) loop() { } // we need to keep ensuring query readiness to download every days new table which would otherwise be downloaded only during queries. - err = tm.ensureQueryReadiness() + err = tm.ensureQueryReadiness(tm.ctx) if err != nil { level.Error(util_log.Logger).Log("msg", "error ensuring query readiness of tables", "err", err) } @@ -152,7 +160,7 @@ func (tm *TableManager) query(ctx context.Context, tableName string, queries []c return util.DoParallelQueries(ctx, table, queries, callback) } -func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) { +func (tm *TableManager) getOrCreateTable(tableName string) (Table, error) { // if table is already there, use it. tm.tablesMtx.RLock() table, ok := tm.tables[tableName] @@ -232,97 +240,107 @@ func (tm *TableManager) cleanupCache() error { } // ensureQueryReadiness compares tables required for being query ready with the tables we already have and downloads the missing ones. -func (tm *TableManager) ensureQueryReadiness() error { - if tm.cfg.QueryReadyNumDays == 0 { +func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error { + activeTableNumber := getActiveTableNumber() + + // find the largest query readiness number + largestQueryReadinessNum := tm.cfg.QueryReadyNumDays + if defaultLimits := tm.cfg.Limits.DefaultLimits(); defaultLimits.QueryReadyIndexNumDays > largestQueryReadinessNum { + largestQueryReadinessNum = defaultLimits.QueryReadyIndexNumDays + } + + queryReadinessNumByUserID := make(map[string]int) + for userID, limits := range tm.cfg.Limits.AllByUserID() { + if limits.QueryReadyIndexNumDays != 0 { + queryReadinessNumByUserID[userID] = limits.QueryReadyIndexNumDays + if limits.QueryReadyIndexNumDays > largestQueryReadinessNum { + largestQueryReadinessNum = limits.QueryReadyIndexNumDays + } + } + } + + // return early if no table has to be downloaded for query readiness + if largestQueryReadinessNum == 0 { return nil } - tableNames, err := tm.indexStorageClient.ListTables(context.Background()) + tables, err := tm.indexStorageClient.ListTables(ctx) if err != nil { return err } - // get the names of tables required for being query ready. - tableNames, err = tm.tablesRequiredForQueryReadiness(tableNames) + // regex for finding daily tables which have a 5 digit number at the end. + re, err := regexp.Compile(`.+[0-9]{5}$`) if err != nil { return err } - level.Debug(util_log.Logger).Log("msg", fmt.Sprintf("list of tables required for query-readiness %s", tableNames)) - - for _, tableName := range tableNames { - tm.tablesMtx.RLock() - table, ok := tm.tables[tableName] - tm.tablesMtx.RUnlock() - if ok { - err = table.EnsureQueryReadiness(context.Background()) - if err != nil { - return err - } + for _, tableName := range tables { + if !re.MatchString(tableName) { continue } - level.Info(util_log.Logger).Log("msg", "table required for query readiness does not exist locally, downloading it", "table-name", tableName) - // table doesn't exist, download it. - tablePath := filepath.Join(tm.cfg.CacheDir, tableName) - err = chunk_util.EnsureDirectory(tablePath) + tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64) if err != nil { return err } - table, err = LoadTable(tableName, filepath.Join(tm.cfg.CacheDir, tableName), tm.indexStorageClient, tm.boltIndexClient, tm.metrics) + // continue if the table is not within query readiness + if activeTableNumber-tableNumber > int64(largestQueryReadinessNum) { + continue + } + + // list the users that have dedicated index files for this table + _, usersWithIndex, err := tm.indexStorageClient.ListFiles(ctx, tableName) if err != nil { return err } - tm.tablesMtx.Lock() - tm.tables[tableName] = table - tm.tablesMtx.Unlock() + // find the users whos index we need to keep ready for querying from this table + usersToBeQueryReadyFor := tm.findUsersInTableForQueryReadiness(tableNumber, usersWithIndex, queryReadinessNumByUserID) - err = table.EnsureQueryReadiness(context.Background()) + // continue if both user index and common index is not required to be downloaded for query readiness + if len(usersToBeQueryReadyFor) == 0 && activeTableNumber-tableNumber > int64(tm.cfg.QueryReadyNumDays) { + continue + } + + table, err := tm.getOrCreateTable(tableName) if err != nil { return err } + + if err := table.EnsureQueryReadiness(ctx, usersToBeQueryReadyFor); err != nil { + return err + } } return nil } -// queryReadyTableNumbersRange returns the table numbers range. Table numbers are added as suffix to table names. -func (tm *TableManager) queryReadyTableNumbersRange() (int64, int64) { - newestTableNumber := getActiveTableNumber() +// findUsersInTableForQueryReadiness returns the users that needs their index to be query ready based on the tableNumber and +// query readiness number provided per user +func (tm *TableManager) findUsersInTableForQueryReadiness(tableNumber int64, usersWithIndexInTable []string, + queryReadinessNumByUserID map[string]int) []string { + activeTableNumber := getActiveTableNumber() + usersToBeQueryReadyFor := []string{} - return newestTableNumber - int64(tm.cfg.QueryReadyNumDays), newestTableNumber -} - -// tablesRequiredForQueryReadiness returns the names of tables required to be downloaded for being query ready as per configured QueryReadyNumDays. -// It only considers daily tables for simplicity and we anyways have made it mandatory to have daily tables with boltdb-shipper. -func (tm *TableManager) tablesRequiredForQueryReadiness(tablesInStorage []string) ([]string, error) { - // regex for finding daily tables which have a 5 digit number at the end. - re, err := regexp.Compile(`.+[0-9]{5}$`) - if err != nil { - return nil, err - } - - minTableNumber, maxTableNumber := tm.queryReadyTableNumbersRange() - var requiredTableNames []string - - for _, tableName := range tablesInStorage { - if !re.MatchString(tableName) { - continue + for _, userID := range usersWithIndexInTable { + // use the query readiness config for the user if it exists or use the default config + queryReadyNumDays, ok := queryReadinessNumByUserID[userID] + if !ok { + queryReadyNumDays = tm.cfg.Limits.DefaultLimits().QueryReadyIndexNumDays } - tableNumber, err := strconv.ParseInt(tableName[len(tableName)-5:], 10, 64) - if err != nil { - return nil, err + if queryReadyNumDays == 0 { + continue } - if minTableNumber <= tableNumber && tableNumber <= maxTableNumber { - requiredTableNames = append(requiredTableNames, tableName) + if activeTableNumber-tableNumber <= int64(queryReadyNumDays) { + usersToBeQueryReadyFor = append(usersToBeQueryReadyFor, userID) } } - return requiredTableNames, nil + return usersToBeQueryReadyFor } // loadLocalTables loads tables present locally. diff --git a/pkg/storage/stores/shipper/downloads/table_manager_test.go b/pkg/storage/stores/shipper/downloads/table_manager_test.go index e4786932d79fa..9f9d1062c54af 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/shipper/downloads/table_manager_test.go @@ -3,17 +3,16 @@ package downloads import ( "context" "fmt" - "math" "path/filepath" "testing" "time" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/storage/chunk" - "github.com/grafana/loki/pkg/storage/chunk/util" + "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" + "github.com/grafana/loki/pkg/validation" ) func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) { @@ -24,6 +23,7 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) CacheDir: cachePath, SyncInterval: time.Hour, CacheTTL: time.Hour, + Limits: &mockLimits{}, } tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil) require.NoError(t, err) @@ -70,34 +70,21 @@ func TestTableManager_cleanupCache(t *testing.T) { expiredTableName := "expired-table" nonExpiredTableName := "non-expired-table" - // query for above 2 tables which should set them up in table manager - err := tableManager.QueryPages(user.InjectOrgID(context.Background(), "fake"), []chunk.IndexQuery{ - {TableName: expiredTableName}, - {TableName: nonExpiredTableName}, - }, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { - return true - }) - - require.NoError(t, err) - // table manager should now have 2 tables. - require.Len(t, tableManager.tables, 2) + tableManager.tables[expiredTableName] = &mockTable{} + tableManager.tables[nonExpiredTableName] = &mockTable{} // call cleanupCache and verify that no tables are cleaned up because they are not yet expired. require.NoError(t, tableManager.cleanupCache()) require.Len(t, tableManager.tables, 2) - // change the last used at time of expiredTable to before the ttl. - expiredTable, ok := tableManager.tables[expiredTableName] - require.True(t, ok) - for _, idxSet := range expiredTable.indexSets { - idxSet.(*indexSet).lastUsedAt = time.Now().Add(-(tableManager.cfg.CacheTTL + time.Minute)) - } + // set the flag for expiredTable to expire. + tableManager.tables[expiredTableName].(*mockTable).tableExpired = true // call the cleanupCache and verify that we still have nonExpiredTable and expiredTable is gone. require.NoError(t, tableManager.cleanupCache()) require.Len(t, tableManager.tables, 1) - _, ok = tableManager.tables[expiredTableName] + _, ok := tableManager.tables[expiredTableName] require.False(t, ok) _, ok = tableManager.tables[nonExpiredTableName] @@ -105,120 +92,222 @@ func TestTableManager_cleanupCache(t *testing.T) { } func TestTableManager_ensureQueryReadiness(t *testing.T) { + activeTableNumber := getActiveTableNumber() + mockIndexStorageClient := &mockIndexStorageClient{ + userIndexesInTables: map[string][]string{}, + } + + cfg := Config{ + SyncInterval: time.Hour, + CacheTTL: time.Hour, + } + + tableManager := &TableManager{ + cfg: cfg, + indexStorageClient: mockIndexStorageClient, + tables: make(map[string]Table), + metrics: newMetrics(nil), + ctx: context.Background(), + cancel: func() {}, + } + + buildTableName := func(idx int) string { + return fmt.Sprintf("table_%d", activeTableNumber-int64(idx)) + } + + // setup 10 tables with 5 latest tables having user index for user1 and user2 + for i := 0; i < 10; i++ { + tableName := buildTableName(i) + tableManager.tables[tableName] = &mockTable{} + mockIndexStorageClient.tablesInStorage = append(mockIndexStorageClient.tablesInStorage, tableName) + if i < 5 { + mockIndexStorageClient.userIndexesInTables[tableName] = []string{"user1", "user2"} + } + } + + // function for resetting state of mockTables + resetTables := func() { + for _, table := range tableManager.tables { + table.(*mockTable).queryReadinessDoneForUsers = nil + } + } + for _, tc := range []struct { name string queryReadyNumDaysCfg int + queryReadinessLimits mockLimits + + expectedQueryReadinessDoneForUsers map[string][]string }{ { - name: "0 queryReadyNumDaysCfg with 10 tables in storage", + name: "no query readiness configured", + queryReadinessLimits: mockLimits{}, }, { - name: "5 queryReadyNumDaysCfg with 10 tables in storage", + name: "common index: 5 days", queryReadyNumDaysCfg: 5, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {}, + buildTableName(1): {}, + buildTableName(2): {}, + buildTableName(3): {}, + buildTableName(4): {}, + buildTableName(5): {}, // NOTE: we include an extra table since we are counting days back from current point in time + }, }, { - name: "20 queryReadyNumDaysCfg with 10 tables in storage", + name: "common index: 20 days", queryReadyNumDaysCfg: 20, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {}, + buildTableName(1): {}, + buildTableName(2): {}, + buildTableName(3): {}, + buildTableName(4): {}, + buildTableName(5): {}, + buildTableName(6): {}, + buildTableName(7): {}, + buildTableName(8): {}, + buildTableName(9): {}, + }, + }, + { + name: "user index default: 2 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysDefault: 2, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1", "user2"}, + buildTableName(1): {"user1", "user2"}, + buildTableName(2): {"user1", "user2"}, + }, + }, + { + name: "common index: 5 days, user index default: 2 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysDefault: 2, + }, + queryReadyNumDaysCfg: 5, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1", "user2"}, + buildTableName(1): {"user1", "user2"}, + buildTableName(2): {"user1", "user2"}, + buildTableName(3): {}, + buildTableName(4): {}, + buildTableName(5): {}, + }, + }, + { + name: "user1: 2 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysByUser: map[string]int{"user1": 2}, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1"}, + buildTableName(1): {"user1"}, + buildTableName(2): {"user1"}, + }, + }, + { + name: "user1: 2 days, user2: 20 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysByUser: map[string]int{"user1": 2, "user2": 20}, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1", "user2"}, + buildTableName(1): {"user1", "user2"}, + buildTableName(2): {"user1", "user2"}, + buildTableName(3): {"user2"}, + buildTableName(4): {"user2"}, + }, + }, + { + name: "user index default: 3 days, user1: 2 days", + queryReadinessLimits: mockLimits{ + queryReadyIndexNumDaysDefault: 3, + queryReadyIndexNumDaysByUser: map[string]int{"user1": 2}, + }, + expectedQueryReadinessDoneForUsers: map[string][]string{ + buildTableName(0): {"user1", "user2"}, + buildTableName(1): {"user1", "user2"}, + buildTableName(2): {"user1", "user2"}, + buildTableName(3): {"user2"}, + }, }, } { t.Run(tc.name, func(t *testing.T) { - tempDir := t.TempDir() - - objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - - tables := map[string]map[string]testutil.DBConfig{} - activeTableNumber := getActiveTableNumber() - for i := 0; i < 10; i++ { - tables[fmt.Sprintf("table_%d", activeTableNumber-int64(i))] = map[string]testutil.DBConfig{ - "db": { - CompressFile: i%2 == 0, - DBRecords: testutil.DBRecords{ - Start: i * 10, - NumRecords: 10, - }, - }, - } - } + resetTables() + tableManager.cfg.QueryReadyNumDays = tc.queryReadyNumDaysCfg + tableManager.cfg.Limits = &tc.queryReadinessLimits + require.NoError(t, tableManager.ensureQueryReadiness(context.Background())) - for name, dbs := range tables { - testutil.SetupDBsAtPath(t, filepath.Join(objectStoragePath, name), dbs, nil) + for name, table := range tableManager.tables { + require.Equal(t, tc.expectedQueryReadinessDoneForUsers[name], table.(*mockTable).queryReadinessDoneForUsers, "table: %s", name) } + }) + } +} - boltDBIndexClient, indexStorageClient := buildTestClients(t, tempDir) - cachePath := filepath.Join(tempDir, cacheDirName) - require.NoError(t, util.EnsureDirectory(cachePath)) - - cfg := Config{ - CacheDir: cachePath, - SyncInterval: time.Hour, - CacheTTL: time.Hour, - QueryReadyNumDays: tc.queryReadyNumDaysCfg, - } - tableManager := &TableManager{ - cfg: cfg, - boltIndexClient: boltDBIndexClient, - indexStorageClient: indexStorageClient, - tables: make(map[string]*Table), - metrics: newMetrics(nil), - ctx: context.Background(), - cancel: func() {}, - } +type mockLimits struct { + queryReadyIndexNumDaysDefault int + queryReadyIndexNumDaysByUser map[string]int +} - defer func() { - tableManager.Stop() - boltDBIndexClient.Stop() - }() +func (m *mockLimits) AllByUserID() map[string]*validation.Limits { + allByUserID := map[string]*validation.Limits{} + for userID := range m.queryReadyIndexNumDaysByUser { + allByUserID[userID] = &validation.Limits{ + QueryReadyIndexNumDays: m.queryReadyIndexNumDaysByUser[userID], + } + } - require.NoError(t, tableManager.ensureQueryReadiness()) + return allByUserID +} - if tc.queryReadyNumDaysCfg == 0 { - require.Len(t, tableManager.tables, 0) - } else { - require.Len(t, tableManager.tables, int(math.Min(float64(tc.queryReadyNumDaysCfg+1), 10))) - } - }) +func (m *mockLimits) DefaultLimits() *validation.Limits { + return &validation.Limits{ + QueryReadyIndexNumDays: m.queryReadyIndexNumDaysDefault, } } -func TestTableManager_tablesRequiredForQueryReadiness(t *testing.T) { - numDailyTablesInStorage := 10 - var tablesInStorage []string - // tables with daily table number - activeDailyTableNumber := getActiveTableNumber() - for i := 0; i < numDailyTablesInStorage; i++ { - tablesInStorage = append(tablesInStorage, fmt.Sprintf("table_%d", activeDailyTableNumber-int64(i))) - } +func (m *mockLimits) QueryReadyIndexNumDays(userID string) int { + return m.queryReadyIndexNumDaysByUser[userID] +} - // tables with weekly table number - activeWeeklyTableNumber := time.Now().Unix() / int64((durationDay*7)/time.Second) - for i := 0; i < 10; i++ { - tablesInStorage = append(tablesInStorage, fmt.Sprintf("table_%d", activeWeeklyTableNumber-int64(i))) - } +type mockTable struct { + tableExpired bool + queryReadinessDoneForUsers []string +} - // tables without a table number - tablesInStorage = append(tablesInStorage, "foo", "bar") +func (m *mockTable) Close() {} - for i, tc := range []int{ - 0, 5, 10, 20, - } { - t.Run(fmt.Sprint(i), func(t *testing.T) { - tableManager := &TableManager{ - cfg: Config{ - QueryReadyNumDays: tc, - }, - } +func (m *mockTable) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error { + return nil +} - tablesNames, err := tableManager.tablesRequiredForQueryReadiness(tablesInStorage) - require.NoError(t, err) +func (m *mockTable) DropUnusedIndex(ttl time.Duration, now time.Time) (bool, error) { + return m.tableExpired, nil +} - numExpectedTables := 0 - if tc != 0 { - numExpectedTables = int(math.Min(float64(tc+1), float64(numDailyTablesInStorage))) - } +func (m *mockTable) Sync(ctx context.Context) error { + return nil +} - for i := 0; i < numExpectedTables; i++ { - require.Equal(t, fmt.Sprintf("table_%d", activeDailyTableNumber-int64(i)), tablesNames[i]) - } - }) - } +func (m *mockTable) EnsureQueryReadiness(ctx context.Context, userIDs []string) error { + m.queryReadinessDoneForUsers = userIDs + return nil +} + +type mockIndexStorageClient struct { + storage.Client + tablesInStorage []string + userIndexesInTables map[string][]string +} + +func (m *mockIndexStorageClient) ListTables(ctx context.Context) ([]string, error) { + return m.tablesInStorage, nil +} + +func (m *mockIndexStorageClient) ListFiles(ctx context.Context, tableName string) ([]storage.IndexFile, []string, error) { + return []storage.IndexFile{}, m.userIndexesInTables[tableName], nil } diff --git a/pkg/storage/stores/shipper/downloads/table_test.go b/pkg/storage/stores/shipper/downloads/table_test.go index 3b9cc650fd059..2f9b97be7d287 100644 --- a/pkg/storage/stores/shipper/downloads/table_test.go +++ b/pkg/storage/stores/shipper/downloads/table_test.go @@ -67,12 +67,14 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, storag return boltDBIndexClient, storage.NewIndexStorageClient(fsObjectClient, "") } -func buildTestTable(t *testing.T, path string) (*Table, *local.BoltIndexClient, stopFunc) { +func buildTestTable(t *testing.T, path string) (*table, *local.BoltIndexClient, stopFunc) { boltDBIndexClient, storageClient := buildTestClients(t, path) cachePath := filepath.Join(path, cacheDirName) - table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)) - require.NoError(t, table.EnsureQueryReadiness(context.Background())) + table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)).(*table) + _, usersWithIndex, err := table.storageClient.ListFiles(context.Background(), tableName) + require.NoError(t, err) + require.NoError(t, table.EnsureQueryReadiness(context.Background(), usersWithIndex)) return table, boltDBIndexClient, func() { table.Close() @@ -130,7 +132,7 @@ func TestTable_MultiQueries(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - table := Table{ + table := table{ indexSets: map[string]IndexSet{}, logger: util_log.Logger, } @@ -237,7 +239,7 @@ func TestTable_DropUnusedIndex(t *testing.T) { expiredIndexUserID: &mockIndexSet{lastUsedAt: now.Add(-25 * time.Hour)}, } - table := Table{ + table := table{ indexSets: indexSets, logger: util_log.Logger, } @@ -270,47 +272,65 @@ func TestTable_DropUnusedIndex(t *testing.T) { func TestTable_EnsureQueryReadiness(t *testing.T) { tempDir := t.TempDir() + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - dbsToSetup := map[string]testutil.DBConfig{ - "db1": { - CompressFile: true, - DBRecords: testutil.DBRecords{ - Start: 0, - NumRecords: 10, - }, + // setup table in storage with 1 common db and 2 users with a db each + testutil.SetupTable(t, filepath.Join(objectStoragePath, tableName), testutil.DBsConfig{ + DBRecordsStart: 0, + NumUnCompactedDBs: 1, + }, testutil.PerUserDBsConfig{ + DBsConfig: testutil.DBsConfig{ + DBRecordsStart: 100, + NumCompactedDBs: 1, }, - } + NumUsers: 2, + }) - objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - tablePathInStorage := filepath.Join(objectStoragePath, tableName) - testutil.SetupDBsAtPath(t, tablePathInStorage, dbsToSetup, nil) + boltDBIndexClient, storageClient := buildTestClients(t, tempDir) + defer boltDBIndexClient.Stop() - table, _, stopFunc := buildTestTable(t, tempDir) - defer func() { - stopFunc() - }() + for _, tc := range []struct { + name string + usersToDoQueryReadinessFor []string + }{ + { + name: "only common index to be query ready", + }, + { + name: "one of the users to be query ready", + usersToDoQueryReadinessFor: []string{testutil.BuildUserID(0)}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + cachePath := t.TempDir() + table := NewTable(tableName, cachePath, storageClient, boltDBIndexClient, newMetrics(nil)).(*table) + defer func() { + table.Close() + }() + + // EnsureQueryReadiness should update the last used at time of common index set + require.NoError(t, table.EnsureQueryReadiness(context.Background(), tc.usersToDoQueryReadinessFor)) + require.Len(t, table.indexSets, len(tc.usersToDoQueryReadinessFor)+1) + for _, userID := range append(tc.usersToDoQueryReadinessFor, "") { + ensureIndexSetExistsInTable(t, table, userID) + require.InDelta(t, time.Now().Unix(), table.indexSets[userID].(*indexSet).lastUsedAt.Unix(), 5) + } + + // change the last used at to verify that it gets updated when we do the query readiness again + for _, idxSet := range table.indexSets { + idxSet.(*indexSet).lastUsedAt = time.Now().Add(-time.Hour) + } - require.Len(t, table.indexSets, 1) - ensureIndexSetExistsInTable(t, table, "") - - // EnsureQueryReadiness should update the last used at time of common index set - table.indexSets[""].(*indexSet).lastUsedAt = time.Now().Add(-time.Hour) - require.NoError(t, table.EnsureQueryReadiness(context.Background())) - require.Len(t, table.indexSets, 1) - ensureIndexSetExistsInTable(t, table, "") - require.InDelta(t, time.Now().Unix(), table.indexSets[""].(*indexSet).lastUsedAt.Unix(), 5) - - testutil.SetupDBsAtPath(t, filepath.Join(tablePathInStorage, userID), dbsToSetup, nil) - - // Running EnsureQueryReadiness should initialize newly setup index for userID. - // Running it multiple times should behave similarly. - for i := 0; i < 2; i++ { - require.NoError(t, table.EnsureQueryReadiness(context.Background())) - require.Len(t, table.indexSets, 2) - ensureIndexSetExistsInTable(t, table, "") - ensureIndexSetExistsInTable(t, table, userID) - require.InDelta(t, time.Now().Unix(), table.indexSets[""].(*indexSet).lastUsedAt.Unix(), 5) - require.InDelta(t, time.Now().Unix(), table.indexSets[userID].(*indexSet).lastUsedAt.Unix(), 5) + // Running it multiple times should not have an impact other than updating last used at time + for i := 0; i < 2; i++ { + require.NoError(t, table.EnsureQueryReadiness(context.Background(), tc.usersToDoQueryReadinessFor)) + require.Len(t, table.indexSets, len(tc.usersToDoQueryReadinessFor)+1) + for _, userID := range append(tc.usersToDoQueryReadinessFor, "") { + ensureIndexSetExistsInTable(t, table, userID) + require.InDelta(t, time.Now().Unix(), table.indexSets[userID].(*indexSet).lastUsedAt.Unix(), 5) + } + } + }) } } @@ -555,7 +575,7 @@ func TestLoadTable(t *testing.T) { testutil.TestSingleTableQuery(t, userID, []chunk.IndexQuery{{}}, table, 0, 40) } -func ensureIndexSetExistsInTable(t *testing.T, table *Table, indexSetName string) { +func ensureIndexSetExistsInTable(t *testing.T, table *table, indexSetName string) { _, ok := table.indexSets[indexSetName] require.True(t, ok) } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 5df7e9482a2d6..e78d6b6c36db0 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -78,7 +78,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") - f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.") + f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of common index to be kept downloaded for queries. For per tenant index query readiness, use limits overrides config.") f.BoolVar(&cfg.BuildPerTenantIndex, "boltdb.shipper.build-per-tenant-index", false, "Build per tenant index files") } @@ -97,13 +97,13 @@ type Shipper struct { } // NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (chunk.IndexClient, error) { +func NewShipper(cfg Config, storageClient chunk.ObjectClient, limits downloads.Limits, registerer prometheus.Registerer) (chunk.IndexClient, error) { shipper := Shipper{ cfg: cfg, metrics: newMetrics(registerer), } - err := shipper.init(storageClient, registerer) + err := shipper.init(storageClient, limits, registerer) if err != nil { return nil, err } @@ -113,7 +113,7 @@ func NewShipper(cfg Config, storageClient chunk.ObjectClient, registerer prometh return &shipper, nil } -func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.Registerer) error { +func (s *Shipper) init(storageClient chunk.ObjectClient, limits downloads.Limits, registerer prometheus.Registerer) error { // When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead. // Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path. boltdbIndexClientDir := s.cfg.ActiveIndexDirectory @@ -156,6 +156,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R SyncInterval: s.cfg.ResyncInterval, CacheTTL: s.cfg.CacheTTL, QueryReadyNumDays: s.cfg.QueryReadyNumDays, + Limits: limits, } downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) if err != nil { diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index 4c8e762601c8d..7039a99374060 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -15,20 +15,15 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/local" - "github.com/grafana/loki/pkg/storage/chunk/storage" ) -var metrics = storage.NewClientMetrics() - func Test_LeaderElection(t *testing.T) { stabilityCheckInterval = 100 * time.Millisecond result := make(chan *ClusterSeed, 10) - objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ - FSConfig: local.FSConfig{ - Directory: t.TempDir(), - }, - }, metrics) + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) require.NoError(t, err) for i := 0; i < 3; i++ { go func() { @@ -86,11 +81,9 @@ func Test_ReportLoop(t *testing.T) { })) usageStatsURL = server.URL - objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ - FSConfig: local.FSConfig{ - Directory: t.TempDir(), - }, - }, metrics) + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) require.NoError(t, err) r, err := NewReporter(Config{Leader: true}, kv.Config{ @@ -149,11 +142,9 @@ func Test_NextReport(t *testing.T) { } func TestWrongKV(t *testing.T) { - objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ - FSConfig: local.FSConfig{ - Directory: t.TempDir(), - }, - }, metrics) + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) require.NoError(t, err) r, err := NewReporter(Config{Leader: true}, kv.Config{ diff --git a/pkg/usagestats/seed_test.go b/pkg/usagestats/seed_test.go index 0d30e7bd31926..c6969df3801bd 100644 --- a/pkg/usagestats/seed_test.go +++ b/pkg/usagestats/seed_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/chunk/local" - "github.com/grafana/loki/pkg/storage/chunk/storage" ) type dnsProviderMock struct { @@ -61,11 +60,9 @@ func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV { func Test_Memberlist(t *testing.T) { stabilityCheckInterval = time.Second - objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ - FSConfig: local.FSConfig{ - Directory: t.TempDir(), - }, - }, metrics) + objectClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: t.TempDir(), + }) require.NoError(t, err) result := make(chan *ClusterSeed, 10) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index cb6c4fef4c0e3..8254de1f8348f 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -78,6 +78,7 @@ type Limits struct { MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` @@ -171,6 +172,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.") + f.IntVar(&l.QueryReadyIndexNumDays, "store.query-ready-index-num-days", 0, "Number of days of index to be kept always downloaded for queries. Applies only to per user index in boltdb-shipper index store. 0 to disable.") _ = l.RulerEvaluationDelay.Set("0s") f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.") @@ -358,6 +360,11 @@ func (o *Overrides) MaxQueriersPerUser(userID string) int { return o.getOverridesForUser(userID).MaxQueriersPerTenant } +// QueryReadyIndexNumDays returns the number of days for which we have to be query ready for a user. +func (o *Overrides) QueryReadyIndexNumDays(userID string) int { + return o.getOverridesForUser(userID).QueryReadyIndexNumDays +} + // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(userID string) int {