Skip to content

Commit

Permalink
Changes for running Table Manager with loki in single binary (grafana…
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepsukhani authored and slim-bean committed May 31, 2019
1 parent 2b7823f commit d7d479a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 17 deletions.
35 changes: 19 additions & 16 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ type Config struct {
Target moduleName `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`

Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
Server server.Config `yaml:"server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
}

// RegisterFlags registers flag.
Expand All @@ -55,19 +56,21 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.ChunkStoreConfig.RegisterFlags(f)
c.SchemaConfig.RegisterFlags(f)
c.LimitsConfig.RegisterFlags(f)
c.TableManager.RegisterFlags(f)
}

// Loki is the root datastructure for Loki.
type Loki struct {
cfg Config

server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
store chunk.Store
server *server.Server
ring *ring.Ring
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
store chunk.Store
tableManager *chunk.TableManager

httpAuthMiddleware middleware.Interface
}
Expand Down
65 changes: 64 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package loki
import (
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/go-kit/kit/log/level"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
Expand All @@ -19,6 +24,8 @@ import (
"github.com/grafana/loki/pkg/querier"
)

const maxChunkAgeForTableManager = 12 * time.Hour

type moduleName int

// The various modules that make up Loki.
Expand All @@ -30,6 +37,7 @@ const (
Ingester
Querier
Store
TableManager
All
)

Expand All @@ -49,6 +57,8 @@ func (m moduleName) String() string {
return "ingester"
case Querier:
return "querier"
case TableManager:
return "table-manager"
case All:
return "all"
default:
Expand Down Expand Up @@ -79,6 +89,9 @@ func (m *moduleName) Set(s string) error {
case "querier":
*m = Querier
return nil
case "table-manager":
*m = TableManager
return nil
case "all":
*m = All
return nil
Expand Down Expand Up @@ -155,6 +168,50 @@ func (t *Loki) stopIngester() error {
return nil
}

func (t *Loki) initTableManager() error {
err := t.cfg.SchemaConfig.Load()
if err != nil {
return err
}

// Assume the newest config is the one to use
lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1]

if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled ||
t.cfg.TableManager.IndexTables.WriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.ReadScale.Enabled ||
t.cfg.TableManager.IndexTables.ReadScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
(t.cfg.StorageConfig.AWSStorageConfig.ApplicationAutoScaling.URL == nil && t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "") {
level.Error(util.Logger).Log("msg", "WriteScale is enabled but no ApplicationAutoScaling or Metrics URL has been provided")
os.Exit(1)
}

tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig)
if err != nil {
return err
}

bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig)
util.CheckFatal("initializing bucket client", err)

t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient)
if err != nil {
return err
}

t.tableManager.Start()
return nil
}

func (t *Loki) stopTableManager() error {
t.tableManager.Stop()
return nil
}

func (t *Loki) initStore() (err error) {
t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
return
Expand Down Expand Up @@ -253,7 +310,13 @@ var modules = map[moduleName]module{
init: (*Loki).initQuerier,
},

TableManager: {
deps: []moduleName{Server},
init: (*Loki).initTableManager,
stop: (*Loki).stopTableManager,
},

All: {
deps: []moduleName{Querier, Ingester, Distributor},
deps: []moduleName{Querier, Ingester, Distributor, TableManager},
},
}

0 comments on commit d7d479a

Please sign in to comment.