diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 9fc21207cecd0..eb6bc8c521996 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -27,15 +27,16 @@ type Config struct { Target moduleName `yaml:"target,omitempty"` AuthEnabled bool `yaml:"auth_enabled,omitempty"` - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage_config,omitempty"` - ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"` - SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage_config,omitempty"` + ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"` + SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` } // RegisterFlags registers flag. @@ -55,19 +56,21 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.ChunkStoreConfig.RegisterFlags(f) c.SchemaConfig.RegisterFlags(f) c.LimitsConfig.RegisterFlags(f) + c.TableManager.RegisterFlags(f) } // Loki is the root datastructure for Loki. type Loki struct { cfg Config - server *server.Server - ring *ring.Ring - overrides *validation.Overrides - distributor *distributor.Distributor - ingester *ingester.Ingester - querier *querier.Querier - store chunk.Store + server *server.Server + ring *ring.Ring + overrides *validation.Overrides + distributor *distributor.Distributor + ingester *ingester.Ingester + querier *querier.Querier + store chunk.Store + tableManager *chunk.TableManager httpAuthMiddleware middleware.Interface } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 54940eaa1ed5b..9f60b6bbb7d46 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -3,12 +3,17 @@ package loki import ( "fmt" "net/http" + "os" "strings" + "time" + "github.com/go-kit/kit/log/level" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -19,6 +24,8 @@ import ( "github.com/grafana/loki/pkg/querier" ) +const maxChunkAgeForTableManager = 12 * time.Hour + type moduleName int // The various modules that make up Loki. @@ -30,6 +37,7 @@ const ( Ingester Querier Store + TableManager All ) @@ -49,6 +57,8 @@ func (m moduleName) String() string { return "ingester" case Querier: return "querier" + case TableManager: + return "table-manager" case All: return "all" default: @@ -79,6 +89,9 @@ func (m *moduleName) Set(s string) error { case "querier": *m = Querier return nil + case "table-manager": + *m = TableManager + return nil case "all": *m = All return nil @@ -155,6 +168,50 @@ func (t *Loki) stopIngester() error { return nil } +func (t *Loki) initTableManager() error { + err := t.cfg.SchemaConfig.Load() + if err != nil { + return err + } + + // Assume the newest config is the one to use + lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1] + + if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled || + t.cfg.TableManager.IndexTables.WriteScale.Enabled || + t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || + t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || + t.cfg.TableManager.ChunkTables.ReadScale.Enabled || + t.cfg.TableManager.IndexTables.ReadScale.Enabled || + t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || + t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && + (t.cfg.StorageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "") { + level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided") + os.Exit(1) + } + + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig) + if err != nil { + return err + } + + bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig) + util.CheckFatal("initializing bucket client", err) + + t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient) + if err != nil { + return err + } + + t.tableManager.Start() + return nil +} + +func (t *Loki) stopTableManager() error { + t.tableManager.Stop() + return nil +} + func (t *Loki) initStore() (err error) { t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) return @@ -253,7 +310,13 @@ var modules = map[moduleName]module{ init: (*Loki).initQuerier, }, + TableManager: { + deps: []moduleName{Server}, + init: (*Loki).initTableManager, + stop: (*Loki).stopTableManager, + }, + All: { - deps: []moduleName{Querier, Ingester, Distributor}, + deps: []moduleName{Querier, Ingester, Distributor, TableManager}, }, }