Skip to content

Commit

Permalink
Loki: Modifies TableManager to use IndexGateway ring (#5972)
Browse files Browse the repository at this point in the history
* Add two metrics to the IndexGateway.

- Add a new `query_readiness_duration_seconds` metric, that reports
  query readiness duration of a tablemanager/index gateway instance. We
  should use it later to report performance against the ring mode
- Add a new `usersToBeQueryReadyForTotal` metric, that reports number of
  users involved in the query readiness operation. We should use it
  later to correlate number of users with the query readiness duration.

* Remove `usersToBeQueryReadyForTotal`.

- It will report all users always for now, so it isn't too helpful the
  way it is.

* Rename metric help text to not mislead people.

* Log queryReadiness duration.

* Fix where log message and duration and triggered.

* Add two metrics to the IndexGateway.

- Add a new `query_readiness_duration_seconds` metric, that reports
  query readiness duration of a tablemanager/index gateway instance. We
  should use it later to report performance against the ring mode
- Add a new `usersToBeQueryReadyForTotal` metric, that reports number of
  users involved in the query readiness operation. We should use it
  later to correlate number of users with the query readiness duration.

* Remove `usersToBeQueryReadyForTotal`.

- It will report all users always for now, so it isn't too helpful the
  way it is.

* Rename metric help text to not mislead people.

* Log queryReadiness duration.

* Fix where log message and duration and triggered.

* Use boundaries to skip users in TableManager.

- Separate the assigning of indexClient in the IndexGateway to allow
  initializing the shipper with the IndexGateway ring
- Add new TenantBoundariesClient entity, that answers if a given
  tenantID should be ignored or not
- Use the TenantBoundariesClient implemented by the IndexGateway in the
  downloads TableManager

* Add IndexGateway configuration docs.

* Add tests for pkg/util/ring TokenFor() and IsAssignedKey()

Signed-off-by: JordanRushing <rushing.jordan@gmail.com>

* Small whitespace fix in pkg/util/ring_test.go

Signed-off-by: JordanRushing <rushing.jordan@gmail.com>

* Apply suggestions from code review

Rewrite QueryIndex error phrase.

Co-authored-by: JordanRushing <rushing.jordan@gmail.com>

* Join users list in a single string.

- This is necessary since go-kit doesn't support array type.

* Tweak queryReadiness log messages.

- As suggested by Ed on
  #5972 (comment) and
  #5972 (comment)

* Implement new RingManager.

- Adds a new entity to indexgateway named RingManager, responsible for
  managing the ring and the lifecycler used by the indexgateway. The
  ringManager is guaranteed to be initiatiated before the Shipper and
  before the IndexGateway.
- Remove the readiness logic from the IndexGateway.
- Pass the RingManager as the TenantBoundaries implementation of the
  Shipper

* Return non-empty ringmanager for all modes.

* Fix lint regarding error not being check.

* Fix lint due to wrong import order.

* Implement support for client and server mode for the ring manager.

* Fix ring manager services registration.

* Add option to configure whether or not to log gateway requests.

* Check if IndexGateway is enabled instead of active.

* Tune RingManager behavior.

- Instantiate ring buffers inside IsAssignedKey instead of reusing same
  buffer to avoid issues with concurrency.
- Remove unnecessary details from IndexGateway mode docs.
- Return error when wrongly instantiating a RingManager when
  IndexGateway is in simple mode.

* Rewrite `TenantInBoundaries` as a func instead of interface.

* Fix lint.

- Fix YAML tag
- Remove else clause

* Use distributor instead of querier in test.

- Since the querier needs to wait for the index gateway ring, it isn't
  suitable for this test anymore.

* Add ring mode guard clause.

* Return code erased by mistake.

* Rename TenantInBoundaries to IndexGatewayOwnsTenant.

* Log len of users instead of all of them to avoid log msgs.

* Add docstrings to new public functions.

* Apply suggestions from code review

Document that tenant filtering is only applied during query readiness.

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

* Modify IsAssignedKey to expect address directly instead of lifecycler.

- Also removes HasDedicatedAddress, since it isn't necessary anymore

* Log error message when IsInReplicationSet fails.

* Modify IsAssignedKey to return true by default.

* Remove wrong assigning of Service for clientMode.

* Log gateway requests before error checks.

* Remove unnecessary endpoint registration.

* Fix lint.

* Update pkg/util/ring.go

Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>

Co-authored-by: JordanRushing <rushing.jordan@gmail.com>
Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
  • Loading branch information
3 people authored May 20, 2022
1 parent b90c460 commit 440fb7d
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 207 deletions.
27 changes: 27 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# key value store.
[ingester: <ingester>]

# Configures the index gateway server.
[index_gateway: <index_gateway>]

# Configures where Loki will store data.
[storage_config: <storage_config>]

Expand Down Expand Up @@ -1757,6 +1760,10 @@ boltdb_shipper:
# The CLI flags prefix for this block config is: boltdb.shipper.index-gateway-client
[grpc_client_config: <grpc_client_config>]
# Configures if gateway requests should be logged or not.
# CLI flag: -boltdb.shipper.index-gateway-client.log-gateway-requests
[log_gateway_requests: <bool> | default = false]
# Cache validity for active index entries. Should be no higher than
# the chunk_idle_period in the ingester settings.
# CLI flag: -store.index-cache-validity
Expand Down Expand Up @@ -2398,6 +2405,26 @@ backoff_config:
[max_retries: <int> | default = 10]
```

## index_gateway

The `index_gateway` block configures the Loki index gateway server, responsible for serving index queries
without the need to constantly interact with the object store.

```yaml
# Defines in which mode the index gateway server will operate (default to 'simple').
# It supports two modes:
# 'simple': an index gateway server instance is responsible for handling,
# storing and returning requests for all indices for all tenants.
# 'ring': an index gateway server instance is responsible for a subset of tenants instead
# of all tenants.
[mode: <string> | default = simple]
# Defines the ring to be used by the index gateway servers and clients in case the servers
# are configured to run in 'ring' mode. In case this isn't configured, this block supports
# inheriting configuration from the common ring section.
[ring: <ring>]
```

## table_manager

The `table_manager` block configures the Loki table-manager.
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ type Loki struct {
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
usageReport *usagestats.Reporter
indexGatewayRing *ring.Ring
indexGatewayRingManager *indexgateway.RingManager

clientMetrics storage.ClientMetrics

Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *Loki) setupModuleManager() error {
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
TableManager: {Server, UsageReport},
Compactor: {Server, Overrides, MemberlistKV, UsageReport},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV},
IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Expand Down
37 changes: 14 additions & 23 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/NYTimes/gziphandler"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/kv/memberlist"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -386,7 +385,9 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// Always set these configs
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing
if t.Cfg.IndexGateway.Mode == indexgateway.RingMode {
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRingManager.Ring
}

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
Expand Down Expand Up @@ -855,7 +856,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort

indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -864,8 +865,6 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return nil, err
}

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway)

indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway)
return gateway, nil
}
Expand All @@ -878,29 +877,21 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(t.Cfg.IndexGateway.Ring.ReplicationFactor)
reg := prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer)

logger := util_log.Logger
ringStore, err := kv.NewClient(
ringCfg.KVStore,
ring.GetCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", reg), "index-gateway"),
logger,
)
if err != nil {
return nil, gerrors.Wrap(err, "kv new client")
managerMode := indexgateway.ClientMode
if t.Cfg.isModuleEnabled(IndexGateway) {
managerMode = indexgateway.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)

t.indexGatewayRing, err = ring.NewWithStoreClientAndStrategy(
ringCfg, indexgateway.RingIdentifier, indexgateway.RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", reg), logger,
)
if err != nil {
return nil, gerrors.Wrap(err, "new with store client and strategy")
return nil, gerrors.Wrap(err, "new index gateway ring manager")
}

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRing)
return t.indexGatewayRing, nil
t.indexGatewayRingManager = rm

t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRingManager)
return t.indexGatewayRingManager, nil
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
Expand Down Expand Up @@ -962,7 +953,7 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {

deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer)
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (cfg *Config) Validate() error {
}

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, registerer prometheus.Registerer) (index.Client, error) {
func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (index.Client, error) {
switch name {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
Expand Down Expand Up @@ -171,7 +171,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi
return nil, err
}

boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, registerer)
boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, ownsTenantFn, registerer)

return boltDBIndexClientWithShipper, err
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
}, nil
}

idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, indexClientReg)
idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -32,6 +31,11 @@ type Limits interface {
DefaultLimits() *validation.Limits
}

// IndexGatewayOwnsTenant is invoked by an IndexGateway instance and answers whether if the given tenant is assigned to this instance or not.
//
// It is only relevant by an IndexGateway in the ring mode and if it returns false for a given tenant, that tenant will be ignored by this IndexGateway during query readiness.
type IndexGatewayOwnsTenant func(tenant string) bool

type Config struct {
CacheDir string
SyncInterval time.Duration
Expand All @@ -52,9 +56,11 @@ type TableManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

ownsTenant IndexGatewayOwnsTenant
}

func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, registerer prometheus.Registerer) (*TableManager, error) {
func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, ownsTenantFn IndexGatewayOwnsTenant, registerer prometheus.Registerer) (*TableManager, error) {
if err := chunk_util.EnsureDirectory(cfg.CacheDir); err != nil {
return nil, err
}
Expand All @@ -64,6 +70,7 @@ func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorage
cfg: cfg,
boltIndexClient: boltIndexClient,
indexStorageClient: indexStorageClient,
ownsTenant: ownsTenantFn,
tables: make(map[string]Table),
metrics: newMetrics(registerer),
ctx: ctx,
Expand Down Expand Up @@ -319,8 +326,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error {
if err := table.EnsureQueryReadiness(ctx, usersToBeQueryReadyFor); err != nil {
return err
}
joinedUsers := strings.Join(usersToBeQueryReadyFor, ",")
level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users", joinedUsers, "duration", time.Since(perTableStart), "table", tableName)
level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users_len", len(usersToBeQueryReadyFor), "duration", time.Since(perTableStart), "table", tableName)
}

return nil
Expand All @@ -345,6 +351,10 @@ func (tm *TableManager) findUsersInTableForQueryReadiness(tableNumber int64, use
continue
}

if tm.ownsTenant != nil && !tm.ownsTenant(userID) {
continue
}

if activeTableNumber-tableNumber <= int64(queryReadyNumDays) {
usersToBeQueryReadyFor = append(usersToBeQueryReadyFor, userID)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/downloads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc)
CacheTTL: time.Hour,
Limits: &mockLimits{},
}
tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil)
tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil, nil)
require.NoError(t, err)

return tableManager, func() {
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/stores/shipper/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type IndexGatewayClientConfig struct {
// Forcefully disable the use of the index gateway client for the storage.
// This is mainly useful for the index-gateway component which should always use the storage.
Disabled bool `yaml:"-"`

// LogGatewayRequests configures if requests sent to the gateway should be logged or not.
// The log messages are of type debug and contain the address of the gateway and the relevant tenant.
LogGatewayRequests bool `yaml:"log_gateway_requests"`
}

// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
Expand All @@ -75,6 +79,7 @@ type IndexGatewayClientConfig struct {
func (i *IndexGatewayClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
f.StringVar(&i.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server running in simple mode.")
f.BoolVar(&i.LogGatewayRequests, prefix+".log-gateway-requests", false, "Whether requests sent to the gateway should be logged or not.")
}

func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -312,6 +317,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client ind
})
var lastErr error
for _, addr := range addrs {
if s.cfg.LogGatewayRequests {
level.Debug(util_log.Logger).Log("msg", "sending request to gateway", "gateway", addr, "tenant", userID)
}

genericClient, err := s.pool.GetClientFor(addr)
if err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to get client for instance %s", addr), "err", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func benchmarkIndexQueries(b *testing.B, queries []index.Query) {
CacheTTL: 15 * time.Minute,
QueryReadyNumDays: 30,
Limits: mockLimits{},
}, bclient, storage.NewIndexStorageClient(fs, "index/"), nil)
}, bclient, storage.NewIndexStorageClient(fs, "index/"), nil, nil)
require.NoError(b, err)

// initialize the index gateway server
Expand Down
Loading

0 comments on commit 440fb7d

Please sign in to comment.